~ubuntu-branches/ubuntu/quantal/udisks2/quantal-security

« back to all changes in this revision

Viewing changes to .pc/mount_in_media.patch/src/tests/integration-test

  • Committer: Package Import Robot
  • Author(s): Martin Pitt
  • Date: 2012-09-06 10:31:42 UTC
  • Revision ID: package-import@ubuntu.com-20120906103142-mr72ukpltd5kqkp7
Tags: 1.99.0-3
* Add 00git_dev_t_annotation.patch: Add workaround annotation for
  udisks_client_get_block_for_dev(), to fix UDisks.Block.get_block_for_dev()
  on 32 bit platforms when calling it through introspection. Patch also
  committed to upstream git.
* Add 00git_testsuite.patch: Pull latest test suite updates from trunk. This
  includes a new test case for permissions for removable devices, plus some
  race condition fixes.
* mount_in_media.patch: Drop the test suite portion, included in previous
  patch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#!/usr/bin/python3
2
 
#
3
 
# udisks2 integration test suite
4
 
#
5
 
# Run in udisks built tree to test local built binaries (needs
6
 
# --localstatedir=/var), or from anywhere else to test system installed
7
 
# binaries. 
8
 
#
9
 
# Usage:
10
 
# - Run all tests: 
11
 
#   src/tests/integration-test
12
 
# - Run only a particular class of tests:
13
 
#   src/tests/integration-test Drive
14
 
# - Run only a single test:
15
 
#   src/tests/integration-test FS.test_ext3
16
 
#
17
 
# Copyright: (C) 2011 Martin Pitt <martin.pitt@ubuntu.com>
18
 
#
19
 
# This program is free software; you can redistribute it and/or modify
20
 
# it under the terms of the GNU General Public License as published by
21
 
# the Free Software Foundation; either version 2 of the License, or
22
 
# (at your option) any later version.
23
 
#
24
 
# This program is distributed in the hope that it will be useful,
25
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
26
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27
 
# GNU General Public License for more details.
28
 
 
29
 
# TODO:
30
 
# - add and test method for changing LUKS passphrase
31
 
# - test Format with take-ownership
32
 
 
33
 
import sys
34
 
import os
35
 
import pwd
36
 
 
37
 
srcdir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
38
 
libdir = os.path.join(srcdir, 'udisks', '.libs')
39
 
 
40
 
# as we can't change LD_LIBRARY_PATH within a running program, and doing
41
 
# #!/usr/bin/env LD_LIBRARY_PATH=... python3 does not work either, do this
42
 
# nasty hack
43
 
if 'LD_LIBRARY_PATH' not in os.environ and os.path.isdir(libdir):
44
 
    os.environ['LD_LIBRARY_PATH'] = libdir
45
 
    os.environ['GI_TYPELIB_PATH'] = '%s/udisks:%s' % (
46
 
            srcdir,
47
 
            os.environ.get('GI_TYPELIB_PATH', ''))
48
 
    os.execv(sys.argv[0], sys.argv)
49
 
    assert False, 'not expecting to land here'
50
 
 
51
 
import subprocess
52
 
import unittest
53
 
import tempfile
54
 
import atexit
55
 
import time
56
 
import shutil
57
 
import signal
58
 
import argparse
59
 
import re
60
 
from glob import glob
61
 
from gi.repository import GLib, Gio, UDisks
62
 
 
63
 
# find local test_polkit.py
64
 
sys.path.insert(0, os.path.dirname(__file__))
65
 
import test_polkitd
66
 
 
67
 
#GI_TYPELIB_PATH=udisks LD_LIBRARY_PATH=udisks/.libs
68
 
VDEV_SIZE = 300000000 # size of virtual test device
69
 
 
70
 
# Those file systems are known to have a broken handling of permissions, in
71
 
# particular the executable bit
72
 
BROKEN_PERMISSIONS_FS = ['ntfs']
73
 
 
74
 
# Some D-BUS API methods cause properties to not be up to date yet when a
75
 
# method call finishes, thus we do an udevadm settle as a workaround. Those
76
 
# methods should eventually get fixed properly, but it's unnerving to have
77
 
# the tests fail on them when you are working on something else. This flag
78
 
# gets set by the --no-workarounds option to disable those syncs, so that these
79
 
# race conditions can be fixed.
80
 
workaround_syncs = False
81
 
 
82
 
no_options = GLib.Variant('a{sv}', {})
83
 
 
84
 
# ----------------------------------------------------------------------------
85
 
 
86
 
class UDisksTestCase(unittest.TestCase):
87
 
    '''Base class for udisks test cases.
88
 
    
89
 
    This provides static functions which are useful for all test cases.
90
 
    '''
91
 
    daemon = None
92
 
    daemon_log = None
93
 
    device = None
94
 
 
95
 
    client = None
96
 
    manager = None
97
 
 
98
 
    @classmethod
99
 
    def init(klass, logfile=None):
100
 
        '''start daemon and set up test environment'''
101
 
 
102
 
        if os.geteuid() != 0:
103
 
            print('this test suite needs to run as root', file=sys.stderr)
104
 
            sys.exit(0)
105
 
 
106
 
        # run from local build tree if we are in one, otherwise use system instance
107
 
        daemon_path = os.path.join(srcdir, 'src', 'udisksd')
108
 
        if (os.access (daemon_path, os.X_OK)):
109
 
            print('Testing binaries from local build tree')
110
 
            klass.check_build_tree_config()
111
 
        else:
112
 
            print('Testing installed system binaries')
113
 
            daemon_path = None
114
 
            for l in open('/usr/share/dbus-1/system-services/org.freedesktop.UDisks2.service'):
115
 
                if l.startswith('Exec='):
116
 
                    daemon_path = l.split('=', 1)[1].split()[0]
117
 
                    break
118
 
            assert daemon_path, 'could not determine daemon path from D-BUS .service file'
119
 
 
120
 
        print('daemon path: ' + daemon_path)
121
 
 
122
 
        klass.device = klass.setup_vdev()
123
 
 
124
 
        # start polkit and udisks on a private DBus
125
 
        klass.dbus = Gio.TestDBus()
126
 
        klass.dbus.up()
127
 
        os.environ['DBUS_SYSTEM_BUS_ADDRESS'] = klass.dbus.get_bus_address()
128
 
        # do not try to communicate with the current desktop session; this will
129
 
        # confuse it, as it cannot see this D-BUS instance
130
 
        try:
131
 
            del os.environ['DISPLAY']
132
 
        except KeyError:
133
 
            pass
134
 
        if logfile:
135
 
            klass.daemon_log = open(logfile, 'w')
136
 
        else:
137
 
            klass.daemon_log = tempfile.TemporaryFile()
138
 
        klass.daemon = subprocess.Popen([daemon_path],
139
 
            stdout=klass.daemon_log, stderr=subprocess.STDOUT)
140
 
        assert klass.daemon.pid, 'daemon failed to start'
141
 
 
142
 
        atexit.register(klass.cleanup)
143
 
 
144
 
        # wait until the daemon has started up
145
 
        timeout = 10
146
 
        while klass.manager is None and timeout > 0:
147
 
            time.sleep(0.2)
148
 
            klass.client = UDisks.Client.new_sync(None)
149
 
            assert klass.client != None
150
 
            klass.manager = klass.client.get_manager()
151
 
            timeout -= 1
152
 
        assert klass.manager, 'daemon failed to start'
153
 
 
154
 
        klass.sync()
155
 
 
156
 
    @classmethod
157
 
    def cleanup(klass):
158
 
        '''stop daemon again and clean up test environment'''
159
 
 
160
 
        subprocess.call(['umount', klass.device], stderr=subprocess.PIPE) # if a test failed
161
 
 
162
 
        os.kill(klass.daemon.pid, signal.SIGTERM)
163
 
        os.wait()
164
 
        klass.daemon = None
165
 
 
166
 
        klass.teardown_vdev(klass.device)
167
 
        klass.device = None
168
 
 
169
 
        del os.environ['DBUS_SYSTEM_BUS_ADDRESS']
170
 
        klass.dbus.down()
171
 
 
172
 
    @classmethod
173
 
    def sync(klass):
174
 
        '''Wait until pending events finished processing.
175
 
        
176
 
        This should only be called for situations where we genuinely have an
177
 
        asynchronous response, like invoking a CLI program and waiting for
178
 
        udev/udisks to catch up on the change events.
179
 
        '''
180
 
        subprocess.call(['udevadm', 'settle'])
181
 
        context = GLib.main_context_default()
182
 
        timeout = 100
183
 
        # wait until all GDBus events have been processed
184
 
        while context.pending() and timeout > 0:
185
 
            klass.client.settle()
186
 
            time.sleep(0.1)
187
 
            timeout -= 1
188
 
        if timeout <= 0:
189
 
            sys.stderr.write('[wait timeout!] ')
190
 
            sys.stderr.flush()
191
 
        klass.client.settle()
192
 
 
193
 
    @classmethod
194
 
    def sync_workaround(klass):
195
 
        '''Wait until pending events finished processing (bug workaround).
196
 
        
197
 
        This should be called for race conditions in the D-BUS API which cause
198
 
        properties to not be up to date yet when a method call finishes. Those
199
 
        should eventually get fixed properly, but it's unnerving to have the
200
 
        tests fail on them when you are working on something else.
201
 
 
202
 
        This sync is not done if running with --no-workarounds.
203
 
        '''
204
 
        if workaround_syncs:
205
 
            klass.sync()
206
 
 
207
 
    @classmethod
208
 
    def zero_device(klass):
209
 
        subprocess.call(['dd', 'if=/dev/zero', 'of='+klass.device, 'bs=10M'],
210
 
                stderr=subprocess.PIPE)
211
 
        time.sleep(0.5)
212
 
        klass.sync()
213
 
 
214
 
    @classmethod
215
 
    def devname(klass, partition=None):
216
 
        '''Get name of test device or one of its partitions'''
217
 
 
218
 
        if partition:
219
 
            if klass.device[-1].isdigit():
220
 
                return klass.device + 'p' + str(partition)
221
 
            else:
222
 
                return klass.device + str(partition)
223
 
        else:
224
 
            return klass.device
225
 
 
226
 
    @classmethod
227
 
    def udisks_block(klass, partition=None):
228
 
        '''Get UDisksBlock object for test device or partition'''
229
 
 
230
 
        assert klass.client
231
 
        devname = klass.devname(partition)
232
 
        dev_t = os.stat(devname).st_rdev
233
 
        block = klass.client.get_block_for_dev(dev_t)
234
 
        assert block, 'did not find an UDisksBlock object for %s' % devname
235
 
        return block
236
 
 
237
 
    @classmethod
238
 
    def udisks_filesystem(klass, partition=None):
239
 
        '''Get UDisksFilesystem object for test device or partition
240
 
        
241
 
        Return None if there is no file system on that device.
242
 
        '''
243
 
        block = klass.udisks_block(partition)
244
 
        return klass.client.get_object(block.get_object_path()).get_property('filesystem')
245
 
 
246
 
    @classmethod
247
 
    def blkid(klass, partition=None, device=None):
248
 
        '''Call blkid and return dictionary of results.'''
249
 
 
250
 
        if not device:
251
 
            device = klass.devname(partition)
252
 
        result = {}
253
 
        cmd = subprocess.Popen(['blkid', '-p', '-o', 'udev', device], stdout=subprocess.PIPE)
254
 
        for l in cmd.stdout:
255
 
            (key, value) = l.decode('UTF-8').split('=', 1)
256
 
            result[key] = value.strip()
257
 
        assert cmd.wait() == 0
258
 
        return result
259
 
 
260
 
    @classmethod
261
 
    def is_mountpoint(klass, path):
262
 
        '''Check if given path is a mount point.'''
263
 
 
264
 
        return subprocess.call(['mountpoint', path], stdout=subprocess.PIPE) == 0
265
 
 
266
 
    @classmethod
267
 
    def mkfs(klass, type, label=None, partition=None):
268
 
        '''Create file system using mkfs.'''
269
 
 
270
 
        if type == 'minix':
271
 
            assert label is None, 'minix does not support labels'
272
 
 
273
 
        # work around mkswap not properly cleaning up an existing reiserfs
274
 
        # signature (mailed kzak about it)
275
 
        if type == 'swap':
276
 
            subprocess.check_call(['wipefs', '-a', klass.devname(partition)],
277
 
                    stdout=subprocess.PIPE)
278
 
 
279
 
        mkcmd =     { 'swap': 'mkswap',
280
 
                    }
281
 
        label_opt = { 'vfat': '-n', 
282
 
                      'reiserfs': '-l',
283
 
                    }
284
 
        extra_opt = { 'vfat': [ '-I', '-F', '32'],
285
 
                      'swap': ['-f'],
286
 
                      'xfs': ['-f'], # XFS complains if there's an existing FS, so force
287
 
                      'ext2': ['-F'], # ext* complains about using entire device, so force
288
 
                      'ext3': ['-F'],
289
 
                      'ext4': ['-F'],
290
 
                      'ntfs': ['-F'],
291
 
                      'reiserfs': ['-ff'],
292
 
                    }
293
 
 
294
 
        cmd = [mkcmd.get(type, 'mkfs.' + type)] + extra_opt.get(type, [])
295
 
        if label:
296
 
            cmd += [label_opt.get(type, '-L'), label]
297
 
        cmd.append(klass.devname(partition))
298
 
 
299
 
        subprocess.check_call(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
300
 
 
301
 
        # kernel/udev generally detect those changes itself, but do not quite
302
 
        # tell us when they are done; so do a little kludge here to know how
303
 
        # long we need to wait
304
 
        subprocess.call(['udevadm', 'trigger', '--action=change',
305
 
            '--sysname-match=' + os.path.basename(klass.devname(partition))])
306
 
        klass.sync()
307
 
 
308
 
    @classmethod
309
 
    def fs_create(klass, partition, type, options):
310
 
        '''Create file system using udisks.'''
311
 
 
312
 
        block = klass.udisks_block(partition)
313
 
        block.call_format_sync(type, options, None)
314
 
        klass.sync_workaround()
315
 
 
316
 
    @classmethod
317
 
    def retry_busy(klass, fn, *args):
318
 
        '''Call a function until it does not fail with "Busy".'''
319
 
 
320
 
        timeout = 10
321
 
        while timeout >= 0:
322
 
            try:
323
 
                return fn(*args)
324
 
            except GLib.GError as e:
325
 
                if not 'UDisks2.Error.DeviceBusy' in e.message:
326
 
                    raise
327
 
                sys.stderr.write('[busy] ')
328
 
                time.sleep(0.3)
329
 
                timeout -= 1
330
 
 
331
 
    @classmethod
332
 
    def check_build_tree_config(klass):
333
 
        '''Check configuration of build tree'''
334
 
 
335
 
        # read make variables
336
 
        make_vars = {}
337
 
        var_re = re.compile('^([a-zA-Z_]+) = (.*)$')
338
 
        make = subprocess.Popen(['make', '-p', '/dev/null'],
339
 
                stdout=subprocess.PIPE)
340
 
        for l in make.stdout:
341
 
            l = l.decode('UTF-8')
342
 
            m = var_re.match(l)
343
 
            if m:
344
 
                make_vars[m.group(1)] = m.group(2)
345
 
        make.wait()
346
 
 
347
 
        # expand make variables
348
 
        subst_re = re.compile('\${([a-zA-Z_]+)}')
349
 
        for (k, v) in make_vars.items():
350
 
            while True:
351
 
                m = subst_re.search(v)
352
 
                if m:
353
 
                    v = subst_re.sub(make_vars.get(m.group(1), ''), v)
354
 
                    make_vars[k] = v
355
 
                else:
356
 
                    break
357
 
 
358
 
        # check localstatedir
359
 
        for d in (os.path.join(make_vars['localstatedir'], 'run', 'udisks2'),
360
 
                os.path.join(make_vars['localstatedir'], 'lib', 'udisks2')):
361
 
            if not os.path.exists(d):
362
 
                sys.stderr.write('The directory %s does not exist; please create it before running these tests.\n' % d)
363
 
                sys.exit(0)
364
 
        
365
 
    @classmethod
366
 
    def setup_vdev(klass):
367
 
        '''create virtual test device
368
 
        
369
 
        It is zeroed out initially.
370
 
 
371
 
        Return the device path.
372
 
        '''
373
 
        # ensure that the scsi_debug module is loaded
374
 
        if os.path.isdir('/sys/module/scsi_debug'):
375
 
            sys.stderr.write('The scsi_debug module is already loaded; please remove before running this test.\n')
376
 
            sys.exit(1)
377
 
 
378
 
        assert subprocess.call(['modprobe', 'scsi_debug', 'dev_size_mb=%i' % (
379
 
            VDEV_SIZE/1048576)]) == 0, 'Failure to modprobe scsi_debug'
380
 
 
381
 
        # wait until all drives are created
382
 
        dirs = []
383
 
        while len(dirs) < 1:
384
 
            dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block')
385
 
            time.sleep(0.1)
386
 
        assert len(dirs) == 1
387
 
 
388
 
        # determine the debug block devices
389
 
        devs = os.listdir(dirs[0])
390
 
        assert len(devs) == 1
391
 
        dev = '/dev/' + devs[0]
392
 
        assert os.path.exists(dev)
393
 
 
394
 
        # let's be 100% sure that we pick a virtual one
395
 
        assert open('/sys/block/%s/device/model' % devs[0]).read().strip() == 'scsi_debug'
396
 
 
397
 
        print('Set up test device: ' + dev)
398
 
        return dev
399
 
 
400
 
    @classmethod
401
 
    def teardown_vdev(klass, device):
402
 
        '''release and remove virtual test device'''
403
 
 
404
 
        klass.remove_device(device)
405
 
        assert subprocess.call(['rmmod', 'scsi_debug']) == 0, \
406
 
                'Failure to rmmod scsi_debug'
407
 
 
408
 
    @classmethod
409
 
    def remove_device(klass, device):
410
 
        '''remove virtual test device'''
411
 
 
412
 
        device = device.split('/')[-1]
413
 
        if os.path.exists('/sys/block/' + device):
414
 
            f = open('/sys/block/%s/device/delete' % device, 'w')
415
 
            f.write('1')
416
 
            f.close()
417
 
        while os.path.exists(device):
418
 
            time.sleep(0.1)
419
 
        klass.sync()
420
 
        time.sleep(0.5) # TODO
421
 
 
422
 
    @classmethod
423
 
    def readd_devices(klass):
424
 
        '''re-add virtual test devices after removal'''
425
 
 
426
 
        scan_files = glob('/sys/bus/pseudo/devices/adapter*/host*/scsi_host/host*/scan')
427
 
        assert len(scan_files) > 0
428
 
        for f in scan_files:
429
 
            open(f, 'w').write('- - -\n')
430
 
        while not os.path.exists(klass.device):
431
 
            time.sleep(0.1)
432
 
        time.sleep(0.5)
433
 
        klass.sync()
434
 
 
435
 
# ----------------------------------------------------------------------------
436
 
 
437
 
class Manager(UDisksTestCase):
438
 
    '''UDisksManager operations'''
439
 
 
440
 
    def test_version(self):
441
 
        '''daemon version'''
442
 
 
443
 
        self.assertTrue(self.manager.get_property('version')[0].isdigit())
444
 
 
445
 
    def test_loop_rw(self):
446
 
        '''loop device R/W'''
447
 
 
448
 
        with tempfile.NamedTemporaryFile() as f:
449
 
            f.truncate(100000000)
450
 
            fd_list = Gio.UnixFDList.new_from_array([f.fileno()])
451
 
 
452
 
            (path, out_fd_list) = self.manager.call_loop_setup_sync(
453
 
                GLib.Variant('h', 0), # fd index
454
 
                no_options,
455
 
                fd_list,
456
 
                None)
457
 
            self.client.settle()
458
 
 
459
 
            obj = self.client.get_object(path)
460
 
            loop = obj.get_property('loop')
461
 
            block = obj.get_property('block')
462
 
            self.assertNotEqual(block, None)
463
 
            self.assertNotEqual(loop, None)
464
 
            self.assertEqual(obj.get_property('filesystem'), None)
465
 
 
466
 
            try:
467
 
                self.assertEqual(loop.get_property('backing-file'), f.name)
468
 
 
469
 
                options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'foo')})
470
 
                block.call_format_sync('ext2', options, None)
471
 
                self.client.settle()
472
 
                self.assertNotEqual(obj.get_property('filesystem'), None)
473
 
 
474
 
                self.assertEqual(block.get_property('id-label'), 'foo')
475
 
                self.assertEqual(block.get_property('id-usage'), 'filesystem')
476
 
                self.assertEqual(block.get_property('id-type'), 'ext2')
477
 
            finally:
478
 
                loop.call_delete_sync(no_options, None)
479
 
 
480
 
    def test_loop_ro(self):
481
 
        '''loop device R/O'''
482
 
 
483
 
        with tempfile.NamedTemporaryFile() as f:
484
 
            f.truncate(100000000)
485
 
            fd_list = Gio.UnixFDList.new_from_array([f.fileno()])
486
 
 
487
 
            (path, out_fd_list) = self.manager.call_loop_setup_sync(
488
 
                GLib.Variant('h', 0), # fd index
489
 
                GLib.Variant('a{sv}', {'read-only': GLib.Variant('b', True)}),
490
 
                fd_list,
491
 
                None)
492
 
            self.client.settle()
493
 
 
494
 
            obj = self.client.get_object(path)
495
 
            loop = obj.get_property('loop')
496
 
            block = obj.get_property('block')
497
 
            self.assertNotEqual(block, None)
498
 
            self.assertNotEqual(loop, None)
499
 
            self.assertEqual(obj.get_property('filesystem'), None)
500
 
 
501
 
            try:
502
 
                self.assertEqual(loop.get_property('backing-file'), f.name)
503
 
 
504
 
                # can't format due to permission error
505
 
                self.assertRaises(GLib.GError, block.call_format_sync, 'ext2',
506
 
                        no_options, None)
507
 
 
508
 
                self.assertEqual(block.get_property('id-label'), '')
509
 
                self.assertEqual(block.get_property('id-usage'), '')
510
 
                self.assertEqual(block.get_property('id-type'), '')
511
 
            finally:
512
 
                self.client.settle()
513
 
                loop.call_delete_sync(no_options, None)
514
 
 
515
 
# ----------------------------------------------------------------------------
516
 
 
517
 
class Drive(UDisksTestCase):
518
 
    '''UDisksDrive'''
519
 
 
520
 
    def setUp(self):
521
 
        self.drive = self.client.get_drive_for_block(self.udisks_block())
522
 
        self.assertNotEqual(self.drive, None)
523
 
 
524
 
    def test_properties(self):
525
 
        '''properties of UDisksDrive object'''
526
 
 
527
 
        self.assertEqual(self.drive.get_property('model'), 'scsi_debug')
528
 
        self.assertEqual(self.drive.get_property('vendor'), 'Linux')
529
 
        self.assertAlmostEqual(self.drive.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0)
530
 
        self.assertEqual(self.drive.get_property('media-available'), True)
531
 
        self.assertEqual(self.drive.get_property('optical'), False)
532
 
 
533
 
        self.assertNotEqual(len(self.drive.get_property('serial')), 0)
534
 
        self.assertNotEqual(len(self.drive.get_property('revision')), 0)
535
 
 
536
 
# ----------------------------------------------------------------------------
537
 
 
538
 
class FS(UDisksTestCase):
539
 
    '''Test detection of all supported file systems'''
540
 
 
541
 
    def setUp(self):
542
 
        self.workdir = tempfile.mkdtemp()
543
 
        self.block = self.udisks_block()
544
 
        self.assertNotEqual(self.block, None)
545
 
 
546
 
    def tearDown(self):
547
 
        if subprocess.call(['umount', self.device], stderr=subprocess.PIPE) == 0:
548
 
            sys.stderr.write('[cleanup unmount] ')
549
 
        shutil.rmtree (self.workdir)
550
 
 
551
 
    def test_zero(self):
552
 
        '''properties of zeroed out device'''
553
 
 
554
 
        self.zero_device()
555
 
        self.assertEqual(self.block.get_property('device'), self.device)
556
 
        self.assertTrue('Linux_scsi_debug' in self.block.get_property('drive'))
557
 
        self.assertEqual(self.block.get_property('hint-system'), True)
558
 
        self.assertEqual(self.block.get_property('id-label'), '')
559
 
        self.assertEqual(self.block.get_property('id-usage'), '')
560
 
        self.assertEqual(self.block.get_property('id-type'), '')
561
 
        self.assertEqual(self.block.get_property('id-uuid'), '')
562
 
        self.assertAlmostEqual(self.block.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0)
563
 
        obj = self.client.get_object(self.block.get_object_path())
564
 
        self.assertEqual(obj.get_property('filesystem'), None)
565
 
        self.assertEqual(obj.get_property('partition'), None)
566
 
        self.assertEqual(obj.get_property('partition-table'), None)
567
 
 
568
 
    def test_ext2(self):
569
 
        '''fs: ext2'''
570
 
        self._do_fs_check('ext2')
571
 
 
572
 
    def test_ext3(self):
573
 
        '''fs: ext3'''
574
 
        self._do_fs_check('ext3')
575
 
 
576
 
    def test_ext4(self):
577
 
        '''fs: ext4'''
578
 
        self._do_fs_check('ext4')
579
 
 
580
 
    def test_btrfs(self):
581
 
        '''fs: btrfs'''
582
 
        self._do_fs_check('btrfs')
583
 
 
584
 
    def test_minix(self):
585
 
        '''fs: minix'''
586
 
        self._do_fs_check('minix')
587
 
 
588
 
    def test_xfs(self):
589
 
        '''fs: XFS'''
590
 
        self._do_fs_check('xfs')
591
 
 
592
 
    def test_ntfs(self):
593
 
        '''fs: NTFS'''
594
 
        self._do_fs_check('ntfs')
595
 
 
596
 
    def test_vfat(self):
597
 
        '''fs: FAT'''
598
 
        self._do_fs_check('vfat')
599
 
 
600
 
    def test_reiserfs(self):
601
 
        '''fs: reiserfs'''
602
 
        self._do_fs_check('reiserfs')
603
 
 
604
 
    def test_swap(self):
605
 
        '''fs: swap'''
606
 
        self._do_fs_check('swap')
607
 
 
608
 
    def test_nilfs2(self):
609
 
        '''fs: nilfs2'''
610
 
        self._do_fs_check('nilfs2')
611
 
 
612
 
    def test_empty(self):
613
 
        '''fs: empty'''
614
 
 
615
 
        self.mkfs('ext4', 'foo')
616
 
        block = self.udisks_block()
617
 
        self.assertEqual(block.get_property('id-usage'), 'filesystem')
618
 
        self.assertEqual(block.get_property('id-type'), 'ext4')
619
 
        self.assertEqual(block.get_property('id-label'), 'foo')
620
 
        self.assertNotEqual(self.udisks_filesystem(), None)
621
 
 
622
 
        self.fs_create(None, 'empty', no_options)
623
 
 
624
 
        self.assertEqual(block.get_property('id-usage'), '')
625
 
        self.assertEqual(block.get_property('id-type'), '')
626
 
        self.assertEqual(block.get_property('id-label'), '')
627
 
        self.assertEqual(self.udisks_filesystem(), None)
628
 
 
629
 
    def test_create_fs_unknown_type(self):
630
 
        '''Format() with unknown type'''
631
 
 
632
 
        try:
633
 
            self.fs_create(None, 'bogus', no_options)
634
 
            self.fail('Expected failure for bogus file system')
635
 
        except GLib.GError as e:
636
 
            self.assertTrue('UDisks2.Error.NotSupported' in e.message)
637
 
            self.assertTrue('type bogus' in e.message)
638
 
 
639
 
    def test_create_fs_unsupported_label(self):
640
 
        '''Format() with unsupported label'''
641
 
 
642
 
        options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'foo')})
643
 
        try:
644
 
            self.fs_create(None, 'minix', options)
645
 
            self.fail('Expected failure for unsupported label')
646
 
        except GLib.GError as e:
647
 
            self.assertTrue('UDisks2.Error.NotSupported' in e.message)
648
 
 
649
 
    def test_force_removal(self):
650
 
        '''fs: forced removal'''
651
 
 
652
 
        # create a fs and mount it
653
 
        self.mkfs('ext4', 'udiskstest')
654
 
        fs = self.udisks_filesystem()
655
 
        mount_path = fs.call_mount_sync(no_options, None)
656
 
        self.assertTrue(mount_path.endswith('udiskstest'))
657
 
        self.assertTrue('/media/' in mount_path)
658
 
        self.assertTrue(self.is_mountpoint(mount_path))
659
 
 
660
 
        dev_t = os.stat(self.devname()).st_rdev
661
 
 
662
 
        # removal should clean up mounts
663
 
        self.remove_device(self.device)
664
 
        self.assertFalse(os.path.exists(mount_path))
665
 
        self.assertEqual(self.client.get_block_for_dev(dev_t), None)
666
 
 
667
 
        # after putting it back, it should be mountable again
668
 
        self.readd_devices()
669
 
        fs = self.udisks_filesystem()
670
 
        self.assertEqual(fs.get_property('mount-points'), [])
671
 
 
672
 
        mount_path = fs.call_mount_sync(no_options, None)
673
 
        self.assertTrue(mount_path.endswith('udiskstest'))
674
 
        self.assertTrue('/media/' in mount_path)
675
 
        self.assertTrue(self.is_mountpoint(mount_path))
676
 
        self.client.settle()
677
 
        self.assertEqual(fs.get_property('mount-points'), [mount_path])
678
 
 
679
 
        self.retry_busy(fs.call_unmount_sync, no_options, None)
680
 
        self.client.settle()
681
 
        self.assertEqual(fs.get_property('mount-points'), [])
682
 
 
683
 
    def _do_fs_check(self, type):
684
 
        '''Run checks for a particular file system.'''
685
 
 
686
 
        if type != 'swap' and subprocess.call(['which', 'mkfs.' + type],
687
 
                stdout=subprocess.PIPE) != 0:
688
 
            sys.stderr.write('[no mkfs.%s, skip] ' % type)
689
 
 
690
 
            # check correct D-Bus exception
691
 
            try:
692
 
                self.fs_create(None, type, no_options)
693
 
                self.fail('Expected failure for missing mkfs.' + type)
694
 
            except GLib.GError as e:
695
 
                self.assertTrue('UDisks2.Error.Failed' in e.message)
696
 
            return
697
 
 
698
 
        # do checks with command line tools (mkfs/mount/umount)
699
 
        sys.stderr.write('[cli] ')
700
 
        sys.stderr.flush()
701
 
 
702
 
        self._do_cli_check(type)
703
 
        if type != 'minix':
704
 
            self._do_cli_check(type, 'test%stst' % type)
705
 
 
706
 
        # put a different fs here instead of zeroing, so that we verify that
707
 
        # udisks overrides existing FS (e. g. XFS complains then), and does not
708
 
        # leave traces of other FS around
709
 
        if type == 'ext3':
710
 
            self.mkfs('swap')
711
 
        else:
712
 
            self.mkfs('ext3')
713
 
 
714
 
        # do checks with udisks operations
715
 
        sys.stderr.write('[ud] ')
716
 
        self._do_udisks_check(type)
717
 
        if type != 'minix':
718
 
            self._do_udisks_check(type, 'test%stst' % type)
719
 
            # also test fs_create with an empty label
720
 
            self._do_udisks_check(type, '')
721
 
 
722
 
    def _do_cli_check(self, type, label=None):
723
 
        '''udisks correctly picks up file system changes from command line tools'''
724
 
 
725
 
        self.mkfs(type, label)
726
 
 
727
 
        block = self.udisks_block()
728
 
 
729
 
        self.assertEqual(block.get_property('id-usage'), (type == 'swap') and 'other' or 'filesystem')
730
 
 
731
 
        self.assertEqual(block.get_property('id-type'), type)
732
 
        self.assertEqual(block.get_property('id-label'), label or '')
733
 
        self.assertEqual(block.get_property('hint-name'), '')
734
 
        if type != 'minix':
735
 
            self.assertEqual(block.get_property('id-uuid'), self.blkid()['ID_FS_UUID'])
736
 
 
737
 
        obj = self.client.get_object(self.block.get_object_path())
738
 
        self.assertEqual(obj.get_property('partition'), None)
739
 
        self.assertEqual(obj.get_property('partition-table'), None)
740
 
 
741
 
        fs = obj.get_property('filesystem')
742
 
        if type == 'swap':
743
 
            self.assertEqual(fs, None)
744
 
        else:
745
 
            self.assertNotEqual(fs, None)
746
 
 
747
 
        if type == 'swap':
748
 
            return
749
 
 
750
 
        # mount it
751
 
        if type == 'ntfs' and subprocess.call(['which', 'mount.ntfs-3g'],
752
 
                stdout=subprocess.PIPE) == 0:
753
 
            # prefer mount.ntfs-3g if we have it (on Debian; Ubuntu
754
 
            # defaults to ntfs-3g if installed); TODO: check other distros
755
 
            mount_prog = 'mount.ntfs-3g'
756
 
        else:
757
 
            mount_prog = 'mount'
758
 
        ret = subprocess.call([mount_prog, self.device, self.workdir])
759
 
        if ret == 32:
760
 
            # missing fs driver
761
 
            sys.stderr.write('[missing kernel driver, skip] ')
762
 
            return
763
 
        self.assertEqual(ret, 0)
764
 
 
765
 
        time.sleep(0.5)
766
 
        self.sync()
767
 
        self.assertEqual(fs.get_property('mount-points'), [self.workdir])
768
 
 
769
 
        # unmount it
770
 
        subprocess.call(['umount', self.workdir])
771
 
        self.sync()
772
 
        self.assertEqual(fs.get_property('mount-points'), [])
773
 
 
774
 
    def _do_udisks_check(self, type, label=None):
775
 
        '''udisks API correctly changes file system'''
776
 
 
777
 
        # create fs 
778
 
        if label is not None:
779
 
            options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', label)})
780
 
        else:
781
 
            options = no_options
782
 
        self.fs_create(None, type, options)
783
 
 
784
 
        # properties
785
 
        id = self.blkid()
786
 
        self.assertEqual(id['ID_FS_USAGE'], type == 'swap' and 'other' or 'filesystem')
787
 
        self.assertEqual(id['ID_FS_TYPE'], type)
788
 
        self.assertEqual(id.get('ID_FS_LABEL', ''), label or '')
789
 
 
790
 
        block = self.udisks_block()
791
 
        self.assertEqual(block.get_property('id-usage'), (type == 'swap') and 'other' or 'filesystem')
792
 
        self.assertEqual(block.get_property('id-type'), type)
793
 
        self.assertEqual(block.get_property('id-label'), label or '')
794
 
 
795
 
        if type == 'swap':
796
 
            return
797
 
 
798
 
        obj = self.client.get_object(self.block.get_object_path())
799
 
        self.assertEqual(obj.get_property('partition'), None)
800
 
        self.assertEqual(obj.get_property('partition-table'), None)
801
 
 
802
 
        fs = self.udisks_filesystem()
803
 
        self.assertNotEqual(fs, None, 'no Filesystem interface for test device')
804
 
        self.assertEqual(fs.get_property('mount-points'), [])
805
 
 
806
 
        # mount
807
 
        mount_path = fs.call_mount_sync(no_options, None)
808
 
 
809
 
        self.assertTrue(mount_path.startswith('/run/media/'), mount_path)
810
 
        if label:
811
 
            self.assertTrue(mount_path.endswith(label))
812
 
 
813
 
        self.sync()
814
 
        self.assertEqual(fs.get_property('mount-points'), [mount_path])
815
 
        self.assertTrue(self.is_mountpoint(mount_path))
816
 
 
817
 
        # no ownership taken, should be root owned
818
 
        st = os.stat(mount_path)
819
 
        self.assertEqual((st.st_uid, st.st_gid), (0, 0))
820
 
 
821
 
        self._do_file_perms_checks(type, mount_path)
822
 
 
823
 
        # unmount
824
 
        self.retry_busy(fs.call_unmount_sync, no_options, None)
825
 
        self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
826
 
        self.assertEqual(fs.get_property('mount-points'), [mount_path])
827
 
 
828
 
        # create fs with taking ownership (daemon:mail == 1:8)
829
 
        #if supports_unix_owners:
830
 
        #    options.append('take_ownership_uid=1')
831
 
        #    options.append('take_ownership_gid=8')
832
 
        #    self.fs_create(None, type, options)
833
 
        #    mount_path = iface.FilesystemMount('', [])
834
 
        #    st = os.stat(mount_path)
835
 
        #    self.assertEqual((st.st_uid, st.st_gid), (1, 8))
836
 
        #    self.retry_busy(self.partition_iface().FilesystemUnmount, [])
837
 
        #    self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
838
 
 
839
 
        # change label
840
 
        supported = True
841
 
        l = 'n"a\m\\"e' + type
842
 
        if type == 'vfat':
843
 
            # VFAT does not support some characters
844
 
            self.assertRaises(GLib.GError, fs.call_set_label_sync, l, 
845
 
                    no_options, None)
846
 
            l = "n@a$me"
847
 
        try:
848
 
            fs.call_set_label_sync(l, no_options, None)
849
 
        except GLib.GError as e:
850
 
            if 'UDisks2.Error.NotSupported' in e.message:
851
 
                # these fses are known to not support relabeling
852
 
                self.assertTrue(type in ['minix', 'btrfs'])
853
 
                supported = False
854
 
            else:
855
 
                raise
856
 
 
857
 
        if supported:
858
 
            block = self.udisks_block()
859
 
            blkid_label = self.blkid().get('ID_FS_LABEL_ENC', '').replace('\\x22', '"').replace(
860
 
                    '\\x5c', '\\').replace('\\x24', '$')
861
 
            self.sync_workaround()
862
 
            if type == 'vfat':
863
 
                # EXFAIL: often (but not always) the label appears in all upper case
864
 
                self.assertEqual(blkid_label.upper(), l.upper())
865
 
                self.assertEqual(block.get_property('id-label').upper(), l.upper())
866
 
            else:
867
 
                self.assertEqual(blkid_label, l)
868
 
                self.assertEqual(block.get_property('id-label'), l)
869
 
 
870
 
            # test setting empty label
871
 
            fs.call_set_label_sync('', no_options, None)
872
 
            self.sync_workaround()
873
 
            self.assertEqual(self.blkid().get('ID_FS_LABEL_ENC', ''), '')
874
 
            self.assertEqual(block.get_property('id-label'), '')
875
 
 
876
 
        # check fs - Not implemented in udisks yet
877
 
        #self.assertEqual(iface.FilesystemCheck([]), True)
878
 
 
879
 
    def _do_file_perms_checks(self, type, mount_point):
880
 
        '''Check for permissions for data files and executables.
881
 
 
882
 
        This particularly checks sane and useful permissions on non-Unix file
883
 
        systems like vfat.
884
 
        '''
885
 
        if type in BROKEN_PERMISSIONS_FS:
886
 
            return
887
 
 
888
 
        f = os.path.join(mount_point, 'simpledata.txt')
889
 
        open(f, 'w').close()
890
 
        self.assertTrue(os.access(f, os.R_OK))
891
 
        self.assertTrue(os.access(f, os.W_OK))
892
 
        self.assertFalse(os.access(f, os.X_OK))
893
 
 
894
 
        f = os.path.join(mount_point, 'simple.exe')
895
 
        shutil.copy('/bin/bash', f)
896
 
        self.assertTrue(os.access(f, os.R_OK))
897
 
        self.assertTrue(os.access(f, os.W_OK))
898
 
        self.assertTrue(os.access(f, os.X_OK))
899
 
 
900
 
        os.mkdir(os.path.join(mount_point, 'subdir'))
901
 
        f = os.path.join(mount_point, 'subdir', 'subdirdata.txt')
902
 
        open(f, 'w').close()
903
 
        self.assertTrue(os.access(f, os.R_OK))
904
 
        self.assertTrue(os.access(f, os.W_OK))
905
 
        self.assertFalse(os.access(f, os.X_OK))
906
 
 
907
 
        f = os.path.join(mount_point, 'subdir', 'subdir.exe')
908
 
        shutil.copy('/bin/bash', f)
909
 
        self.assertTrue(os.access(f, os.R_OK))
910
 
        self.assertTrue(os.access(f, os.W_OK))
911
 
        self.assertTrue(os.access(f, os.X_OK))
912
 
 
913
 
## ----------------------------------------------------------------------------
914
 
 
915
 
class Smart(UDisksTestCase):
916
 
    '''Check SMART operation.'''
917
 
 
918
 
    def test_sda(self):
919
 
        '''SMART status of first internal hard disk
920
 
        
921
 
        This is a best-effort readonly test.
922
 
        '''
923
 
        hd = '/dev/sda'
924
 
 
925
 
        if not os.path.exists(hd):
926
 
            sys.stderr.write('[skip] ')
927
 
            return
928
 
 
929
 
        has_smart = subprocess.call(['skdump', '--can-smart', hd],
930
 
                stdout=subprocess.PIPE, stderr=subprocess.STDOUT) == 0
931
 
 
932
 
        block = self.client.get_block_for_dev(os.stat(hd).st_rdev)
933
 
        self.assertNotEqual(block, None)
934
 
        drive = self.client.get_drive_for_block(block)
935
 
        ata = self.client.get_object(drive.get_object_path()).get_property('drive-ata')
936
 
        self.assertEqual(ata != None, has_smart)
937
 
 
938
 
        if has_smart:
939
 
            sys.stderr.write('[avail] ')
940
 
            self.assertEqual(ata.get_property('smart-supported'), True)
941
 
            self.assertEqual(ata.get_property('smart-enabled'), True)
942
 
 
943
 
            # wait for SMART data to be read
944
 
            while ata.get_property('smart-updated') == 0:
945
 
                sys.stderr.write('[wait for data] ')
946
 
                time.sleep(0.5)
947
 
 
948
 
            # this is of course not truly correct for a test suite, but let's
949
 
            # consider it a courtesy for developers :-)
950
 
            self.assertEqual(ata.get_property('smart-failing'), False)
951
 
            self.assertTrue(ata.get_property('smart-selftest-status') in ['success', 'inprogress'])
952
 
        else:
953
 
            sys.stderr.write('[N/A] ')
954
 
 
955
 
 
956
 
# ----------------------------------------------------------------------------
957
 
 
958
 
class Luks(UDisksTestCase):
959
 
    '''Check LUKS.'''
960
 
 
961
 
    def tearDown(self):
962
 
        '''clean up behind failed test cases'''
963
 
 
964
 
        crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
965
 
        if crypt_obj:
966
 
            encrypted = crypt_obj.get_property('encrypted')
967
 
            if encrypted:
968
 
                try:
969
 
                    encrypted.call_lock_sync(no_options, None)
970
 
                    sys.stderr.write('[cleanup lock] ')
971
 
                except GLib.GError:
972
 
                    pass
973
 
 
974
 
    # needs to run before the other tests
975
 
    def test_0_create_teardown(self):
976
 
        '''LUKS create/teardown'''
977
 
 
978
 
        self.fs_create(None, 'ext4', GLib.Variant('a{sv}', {
979
 
            'encrypt.passphrase': GLib.Variant('s', 's3kr1t'),
980
 
            'label': GLib.Variant('s', 'treasure'),
981
 
            }))
982
 
 
983
 
        try:
984
 
            block = self.udisks_block()
985
 
            obj = self.client.get_object(block.get_object_path())
986
 
            self.assertEqual(obj.get_property('filesystem'), None)
987
 
            encrypted = obj.get_property('encrypted')
988
 
            self.assertNotEqual(encrypted, None)
989
 
 
990
 
            # check crypted device info
991
 
            self.assertEqual(block.get_property('id-type'), 'crypto_LUKS')
992
 
            self.assertEqual(block.get_property('id-usage'), 'crypto')
993
 
            self.assertEqual(block.get_property('id-label'), '')
994
 
            self.assertEqual(block.get_property('id-uuid'), self.blkid()['ID_FS_UUID'])
995
 
            self.assertEqual(block.get_property('device'), self.devname())
996
 
 
997
 
            # check whether we can lock/unlock; we also need this to get the
998
 
            # cleartext device
999
 
            encrypted.call_lock_sync(no_options, None)
1000
 
            self.assertRaises(GLib.GError, encrypted.call_lock_sync, 
1001
 
                    no_options, None)
1002
 
            
1003
 
            # wrong password
1004
 
            self.assertRaises(GLib.GError, encrypted.call_unlock_sync, 
1005
 
                    'h4ckpassword', no_options, None)
1006
 
            # right password
1007
 
            clear_path = encrypted.call_unlock_sync('s3kr1t', 
1008
 
                    no_options, None)
1009
 
 
1010
 
            # check cleartext device info
1011
 
            clear_obj = self.client.get_object(clear_path)
1012
 
            self.assertEqual(clear_obj.get_property('encrypted'), None)
1013
 
            clear_block = clear_obj.get_property('block')
1014
 
            self.assertEqual(clear_block.get_property('id-type'), 'ext4')
1015
 
            self.assertEqual(clear_block.get_property('id-usage'), 'filesystem')
1016
 
            self.assertEqual(clear_block.get_property('id-label'), 'treasure')
1017
 
            self.assertNotEqual(clear_block.get_property('crypto-backing-device'), None)
1018
 
            clear_dev = clear_block.get_property('device')
1019
 
            self.assertNotEqual(clear_dev, None)
1020
 
            self.assertEqual(clear_block.get_property('id-uuid'),
1021
 
                    self.blkid(device=clear_dev)['ID_FS_UUID'])
1022
 
 
1023
 
            clear_fs = clear_obj.get_property('filesystem')
1024
 
            self.assertEqual(clear_fs.get_property('mount-points'), [])
1025
 
 
1026
 
            # check that we do not leak key information
1027
 
            udev_dump = subprocess.Popen(['udevadm', 'info', '--export-db'],
1028
 
                    stdout=subprocess.PIPE)
1029
 
            out = udev_dump.communicate()[0]
1030
 
            self.assertFalse(b's3kr1t' in out, 'password in udev properties')
1031
 
            self.assertFalse(b'essiv:sha' in out, 'key information in udev properties')
1032
 
 
1033
 
        finally:
1034
 
            # tear down cleartext device
1035
 
            encrypted.call_lock_sync(no_options, None)
1036
 
            self.assertFalse(os.path.exists(clear_dev))
1037
 
 
1038
 
    def test_luks_mount(self):
1039
 
        '''LUKS mount/unmount'''
1040
 
 
1041
 
        crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
1042
 
        encrypted = crypt_obj.get_property('encrypted')
1043
 
 
1044
 
        path = encrypted.call_unlock_sync('s3kr1t', 
1045
 
                no_options, None)
1046
 
        self.client.settle()
1047
 
        obj = self.client.get_object(path)
1048
 
        fs = obj.get_property('filesystem')
1049
 
        self.assertNotEqual(fs, None)
1050
 
 
1051
 
        # mount
1052
 
        mount_path = fs.call_mount_sync(no_options, None)
1053
 
 
1054
 
        try:
1055
 
            self.assertTrue('/media/' in mount_path)
1056
 
            self.assertTrue(mount_path.endswith('treasure'))
1057
 
            self.assertTrue(self.is_mountpoint(mount_path))
1058
 
            self.client.settle()
1059
 
            self.assertEqual(fs.get_property('mount-points'), [mount_path])
1060
 
 
1061
 
            # can't lock, busy
1062
 
            try:
1063
 
                encrypted.call_lock_sync(no_options, None)
1064
 
                self.fail('Lock() unexpectedly succeeded on mounted file system')
1065
 
            except GLib.GError as e:
1066
 
                self.assertTrue('UDisks2.Error.Failed' in e.message)
1067
 
        finally:
1068
 
            # umount
1069
 
            self.retry_busy(fs.call_unmount_sync, no_options, None)
1070
 
            self.client.settle()
1071
 
            self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
1072
 
            self.assertEqual(fs.get_property('mount-points'), [])
1073
 
 
1074
 
            # lock
1075
 
            encrypted.call_lock_sync(no_options, None)
1076
 
            self.client.settle()
1077
 
            self.assertEqual(self.client.get_object(path), None)
1078
 
 
1079
 
    def test_luks_forced_removal(self):
1080
 
        '''LUKS forced removal'''
1081
 
 
1082
 
        # unlock and mount it
1083
 
        crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
1084
 
        path = crypt_obj.get_property('encrypted').call_unlock_sync('s3kr1t', 
1085
 
                no_options, None)
1086
 
        try:
1087
 
            fs = self.client.get_object(path).get_property('filesystem')
1088
 
            mount_path = fs.call_mount_sync(no_options, None)
1089
 
            self.assertTrue('/media/' in mount_path)
1090
 
            self.assertTrue(mount_path.endswith('treasure'))
1091
 
 
1092
 
            # removal should clean up mounts
1093
 
            self.remove_device(self.device)
1094
 
            self.assertFalse(os.path.exists(mount_path))
1095
 
            self.assertEqual(self.client.get_object(path), None)
1096
 
 
1097
 
            # after putting it back, it should be mountable again
1098
 
            self.readd_devices()
1099
 
            crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
1100
 
            path = crypt_obj.get_property('encrypted').call_unlock_sync('s3kr1t', 
1101
 
                    no_options, None)
1102
 
            self.client.settle()
1103
 
            fs = self.client.get_object(path).get_property('filesystem')
1104
 
            mount_path = fs.call_mount_sync(no_options, None)
1105
 
            self.assertTrue('/media/' in mount_path)
1106
 
            self.assertTrue(mount_path.endswith('treasure'))
1107
 
 
1108
 
            # umount
1109
 
            self.retry_busy(fs.call_unmount_sync, no_options, None)
1110
 
            self.client.settle()
1111
 
            self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
1112
 
            self.assertEqual(fs.get_property('mount-points'), [])
1113
 
        finally:
1114
 
            # lock
1115
 
            crypt_obj.get_property('encrypted').call_lock_sync(
1116
 
                    no_options, None)
1117
 
            self.client.settle()
1118
 
            self.assertEqual(self.client.get_object(path), None)
1119
 
 
1120
 
# ----------------------------------------------------------------------------
1121
 
 
1122
 
class Polkit(UDisksTestCase, test_polkitd.PolkitTestCase):
1123
 
    '''Check operation with polkit.'''
1124
 
 
1125
 
    def test_internal_fs_forbidden(self):
1126
 
        '''Create FS on internal drive (forbidden)'''
1127
 
 
1128
 
        self.start_polkitd(['org.freedesktop.udisks2.modify-device'])
1129
 
 
1130
 
        options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'polkitno')})
1131
 
        with self.assertRaises(GLib.GError) as cm:
1132
 
            self.fs_create(None, 'ext4', options)
1133
 
        self.assertTrue('UDisks2.Error.NotAuthorized' in cm.exception.message,
1134
 
                cm.exception.message)
1135
 
 
1136
 
        # did not actually do anything
1137
 
        block = self.udisks_block()
1138
 
        self.assertNotEqual(block.get_property('id-label'), 'polkitno')
1139
 
 
1140
 
    def test_internal_fs_allowed(self):
1141
 
        '''Create FS on internal drive (allowed)'''
1142
 
 
1143
 
        self.start_polkitd(['org.freedesktop.udisks2.modify-device-system',
1144
 
            'org.freedesktop.udisks2.modify-device'])
1145
 
 
1146
 
        options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'polkityes')})
1147
 
        self.fs_create(None, 'ext4', options)
1148
 
        block = self.udisks_block()
1149
 
        self.assertEqual(block.get_property('id-usage'), 'filesystem')
1150
 
        self.assertEqual(block.get_property('id-type'), 'ext4')
1151
 
        self.assertEqual(block.get_property('id-label'), 'polkityes')
1152
 
 
1153
 
# ----------------------------------------------------------------------------
1154
 
 
1155
 
if __name__ == '__main__':
1156
 
    argparser = argparse.ArgumentParser(description='udisks2 integration test suite')
1157
 
    argparser.add_argument('-l', '--log-file', dest='logfile',
1158
 
            help='write daemon log to a file')
1159
 
    argparser.add_argument('-w', '--no-workarounds',
1160
 
            action="store_true", default=False,
1161
 
            help='Disable workarounds for race conditions in the D-BUS API')
1162
 
    argparser.add_argument('testname', nargs='*',
1163
 
            help='name of test class or method (e. g. "Drive", "FS.test_ext2")')
1164
 
    args = argparser.parse_args()
1165
 
 
1166
 
    workaround_syncs = not args.no_workarounds
1167
 
 
1168
 
    UDisksTestCase.init(logfile=args.logfile)
1169
 
    if args.testname:
1170
 
        tests = unittest.TestLoader().loadTestsFromNames(args.testname,
1171
 
                __import__('__main__'))
1172
 
    else:
1173
 
        tests = unittest.TestLoader().loadTestsFromName('__main__')
1174
 
    if unittest.TextTestRunner(verbosity=2).run(tests).wasSuccessful():
1175
 
        sys.exit(0)
1176
 
    else:
1177
 
        sys.exit(1)
1178