~ubuntu-multiseat/ubuntu/trusty/udisks2/full-logind-support

« back to all changes in this revision

Viewing changes to debian/local/integration-test

  • Committer: Package Import Robot
  • Author(s): Martin Pitt
  • Date: 2012-06-13 17:01:30 UTC
  • mfrom: (1.1.1) (2.1.1 experimental)
  • Revision ID: package-import@ubuntu.com-20120613170130-9jggrd76bkv1vd0b
Tags: 1.98.0-1
* New upstream release.
* debian/control: Drop ntfsprogs Recommends. It is a transitional package
  for ntfs-3g now, which we already recommend.
* Add 00git_no_polkit_fallback.patch: Fix crash if polkit is not available.
  Patch backported from current upstream git head.
* Add debian/local/integration-test: Latest integration test suite from
  upstream git. 1.99 and later will ship that in the source tarball.
* Add debian/tests/control and debian/tests/upstream-system: DEP-8
  autopkgtest (adapted from udisks package).
* debian/control: Change suggestion of cryptsetup to cryptsetup-bin, as that
  is sufficient for udisks' needs.
* debian/copyright: Fix duplicate copyright line, thanks lintian.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/python3
 
2
#
 
3
# udisks2 integration test suite
 
4
#
 
5
# Run in udisks built tree to test local built binaries (needs
 
6
# --localstatedir=/var), or from anywhere else to test system installed
 
7
# binaries. 
 
8
#
 
9
# Usage:
 
10
# - Run all tests: 
 
11
#   src/tests/integration-test
 
12
# - Run only a particular class of tests:
 
13
#   src/tests/integration-test Drive
 
14
# - Run only a single test:
 
15
#   src/tests/integration-test FS.test_ext3
 
16
#
 
17
# Copyright: (C) 2011 Martin Pitt <martin.pitt@ubuntu.com>
 
18
#
 
19
# This program is free software; you can redistribute it and/or modify
 
20
# it under the terms of the GNU General Public License as published by
 
21
# the Free Software Foundation; either version 2 of the License, or
 
22
# (at your option) any later version.
 
23
#
 
24
# This program is distributed in the hope that it will be useful,
 
25
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
26
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
27
# GNU General Public License for more details.
 
28
 
 
29
# TODO:
 
30
# - add and test method for changing LUKS passphrase
 
31
# - test Format with take-ownership
 
32
 
 
33
import sys
 
34
import os
 
35
import pwd
 
36
 
 
37
srcdir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
 
38
libdir = os.path.join(srcdir, 'udisks', '.libs')
 
39
 
 
40
# as we can't change LD_LIBRARY_PATH within a running program, and doing
 
41
# #!/usr/bin/env LD_LIBRARY_PATH=... python3 does not work either, do this
 
42
# nasty hack
 
43
if 'LD_LIBRARY_PATH' not in os.environ and os.path.isdir(libdir):
 
44
    os.environ['LD_LIBRARY_PATH'] = libdir
 
45
    os.environ['GI_TYPELIB_PATH'] = '%s/udisks:%s' % (
 
46
            srcdir,
 
47
            os.environ.get('GI_TYPELIB_PATH', ''))
 
48
    os.execv(sys.argv[0], sys.argv)
 
49
    assert False, 'not expecting to land here'
 
50
 
 
51
import subprocess
 
52
import unittest
 
53
import tempfile
 
54
import atexit
 
55
import time
 
56
import shutil
 
57
import signal
 
58
import argparse
 
59
import re
 
60
from glob import glob
 
61
from gi.repository import GLib, Gio, UDisks
 
62
 
 
63
#GI_TYPELIB_PATH=udisks LD_LIBRARY_PATH=udisks/.libs
 
64
VDEV_SIZE = 300000000 # size of virtual test device
 
65
 
 
66
# Those file systems are known to have a broken handling of permissions, in
 
67
# particular the executable bit
 
68
BROKEN_PERMISSIONS_FS = ['ntfs']
 
69
 
 
70
# Some D-BUS API methods cause properties to not be up to date yet when a
 
71
# method call finishes, thus we do an udevadm settle as a workaround. Those
 
72
# methods should eventually get fixed properly, but it's unnerving to have
 
73
# the tests fail on them when you are working on something else. This flag
 
74
# gets set by the --no-workarounds option to disable those syncs, so that these
 
75
# race conditions can be fixed.
 
76
workaround_syncs = False
 
77
 
 
78
no_options = GLib.Variant('a{sv}', {})
 
79
 
 
80
# ----------------------------------------------------------------------------
 
81
 
 
82
class UDisksTestCase(unittest.TestCase):
 
83
    '''Base class for udisks test cases.
 
84
    
 
85
    This provides static functions which are useful for all test cases.
 
86
    '''
 
87
    tool_path = None
 
88
    daemon = None
 
89
    daemon_log = None
 
90
    device = None
 
91
 
 
92
    client = None
 
93
    manager = None
 
94
 
 
95
    @classmethod
 
96
    def init(klass, logfile=None):
 
97
        '''start daemon and set up test environment'''
 
98
 
 
99
        if os.geteuid() != 0:
 
100
            print('this test suite needs to run as root', file=sys.stderr)
 
101
            sys.exit(0)
 
102
 
 
103
        # run from local build tree if we are in one, otherwise use system instance
 
104
        daemon_path = os.path.join(srcdir, 'src', 'udisksd')
 
105
        if (os.access (daemon_path, os.X_OK)):
 
106
            klass.tool_path = 'tools/.libs/udisksctl'
 
107
            print('Testing binaries from local build tree')
 
108
            klass.check_build_tree_config()
 
109
        else:
 
110
            print('Testing installed system binaries')
 
111
            daemon_path = None
 
112
            for l in open('/usr/share/dbus-1/system-services/org.freedesktop.UDisks2.service'):
 
113
                if l.startswith('Exec='):
 
114
                    daemon_path = l.split('=', 1)[1].split()[0]
 
115
                    break
 
116
            assert daemon_path, 'could not determine daemon path from D-BUS .service file'
 
117
 
 
118
            klass.tool_path = 'udisksctl'
 
119
        print('daemon path: ' + daemon_path)
 
120
 
 
121
        for l in open('/usr/share/dbus-1/system-services/org.freedesktop.PolicyKit1.service'):
 
122
            if l.startswith('Exec='):
 
123
                klass.polkit_path = l.split('=', 1)[1].split()[0].strip()
 
124
                break
 
125
        assert klass.polkit_path, 'could not determine polkit path from D-BUS .service file'
 
126
        print('polkitd path: ' + klass.polkit_path)
 
127
 
 
128
        klass.device = klass.setup_vdev()
 
129
 
 
130
        # use a D-BUS config which permits root and nobody
 
131
        klass.dbus_conf = tempfile.NamedTemporaryFile()
 
132
        klass.dbus_conf.write(b'''<!DOCTYPE busconfig PUBLIC "-//freedesktop//DTD D-Bus Bus Configuration 1.0//EN"
 
133
 "http://www.freedesktop.org/standards/dbus/1.0/busconfig.dtd">
 
134
<busconfig>
 
135
  <type>test</type>
 
136
  <listen>unix:tmpdir=/tmp</listen>
 
137
  <policy context="default">
 
138
    <allow send_destination="*" eavesdrop="true"/>
 
139
    <allow eavesdrop="true"/>
 
140
    <allow own="*"/>
 
141
    <allow user="root"/>
 
142
    <allow user="nobody"/>
 
143
  </policy>
 
144
</busconfig>
 
145
''')
 
146
        klass.dbus_conf.flush()
 
147
 
 
148
        # start polkit and udisks on a private DBus
 
149
        dbus = subprocess.Popen(['dbus-launch', '--config-file=' + klass.dbus_conf.name], 
 
150
                stdout=subprocess.PIPE, universal_newlines=True)
 
151
        dbus_out = dbus.communicate()[0]
 
152
        for l in dbus_out.splitlines():
 
153
            k, v = l.split('=', 1)
 
154
            if k == 'DBUS_SESSION_BUS_PID':
 
155
                klass.dbus_pid = int(v)
 
156
            if k == 'DBUS_SESSION_BUS_ADDRESS':
 
157
                os.environ['DBUS_SYSTEM_BUS_ADDRESS'] = v
 
158
        # do not try to communicate with the current desktop session; this will
 
159
        # confuse it, as it cannot see this D-BUS instance
 
160
        try:
 
161
            del os.environ['DISPLAY']
 
162
        except KeyError:
 
163
            pass
 
164
        if logfile:
 
165
            klass.daemon_log = open(logfile, 'w')
 
166
        else:
 
167
            klass.daemon_log = tempfile.TemporaryFile()
 
168
        klass.daemon = subprocess.Popen([daemon_path],
 
169
            stdout=klass.daemon_log, stderr=subprocess.STDOUT)
 
170
        assert klass.daemon.pid, 'daemon failed to start'
 
171
 
 
172
        atexit.register(klass.cleanup)
 
173
 
 
174
        # wait until the daemon has started up
 
175
        timeout = 10
 
176
        while klass.manager is None and timeout > 0:
 
177
            time.sleep(0.2)
 
178
            klass.client = UDisks.Client.new_sync(None)
 
179
            assert klass.client != None
 
180
            klass.manager = klass.client.get_manager()
 
181
            timeout -= 1
 
182
        assert klass.manager, 'daemon failed to start'
 
183
 
 
184
        klass.sync()
 
185
 
 
186
    @classmethod
 
187
    def cleanup(klass):
 
188
        '''stop daemon again and clean up test environment'''
 
189
 
 
190
        subprocess.call(['umount', klass.device], stderr=subprocess.PIPE) # if a test failed
 
191
 
 
192
        os.kill(klass.daemon.pid, signal.SIGTERM)
 
193
        os.wait()
 
194
        klass.daemon = None
 
195
 
 
196
        klass.teardown_vdev(klass.device)
 
197
        klass.device = None
 
198
 
 
199
        klass.dbus_conf.close()
 
200
        del os.environ['DBUS_SYSTEM_BUS_ADDRESS']
 
201
        os.kill(klass.dbus_pid, signal.SIGTERM)
 
202
 
 
203
    @classmethod
 
204
    def sync(klass):
 
205
        '''Wait until pending events finished processing.
 
206
        
 
207
        This should only be called for situations where we genuinely have an
 
208
        asynchronous response, like invoking a CLI program and waiting for
 
209
        udev/udisks to catch up on the change events.
 
210
        '''
 
211
        subprocess.call(['udevadm', 'settle'])
 
212
        klass.client.settle()
 
213
 
 
214
    @classmethod
 
215
    def sync_workaround(klass):
 
216
        '''Wait until pending events finished processing (bug workaround).
 
217
        
 
218
        This should be called for race conditions in the D-BUS API which cause
 
219
        properties to not be up to date yet when a method call finishes. Those
 
220
        should eventually get fixed properly, but it's unnerving to have the
 
221
        tests fail on them when you are working on something else.
 
222
 
 
223
        This sync is not done if running with --no-workarounds.
 
224
        '''
 
225
        if workaround_syncs:
 
226
            klass.sync()
 
227
 
 
228
    @classmethod
 
229
    def zero_device(klass):
 
230
        subprocess.call(['dd', 'if=/dev/zero', 'of='+klass.device, 'bs=10M'],
 
231
                stderr=subprocess.PIPE)
 
232
        klass.sync()
 
233
 
 
234
    @classmethod
 
235
    def devname(klass, partition=None):
 
236
        '''Get name of test device or one of its partitions'''
 
237
 
 
238
        if partition:
 
239
            if klass.device[-1].isdigit():
 
240
                return klass.device + 'p' + str(partition)
 
241
            else:
 
242
                return klass.device + str(partition)
 
243
        else:
 
244
            return klass.device
 
245
 
 
246
    @classmethod
 
247
    def udisks_block(klass, partition=None):
 
248
        '''Get UDisksBlock object for test device or partition'''
 
249
 
 
250
        assert klass.client
 
251
        devname = klass.devname(partition)
 
252
        dev_t = os.stat(devname).st_rdev
 
253
        block = klass.client.get_block_for_dev(dev_t)
 
254
        assert block, 'did not find an UDisksBlock object for %s' % devname
 
255
        return block
 
256
 
 
257
    @classmethod
 
258
    def udisks_filesystem(klass, partition=None):
 
259
        '''Get UDisksFilesystem object for test device or partition
 
260
        
 
261
        Return None if there is no file system on that device.
 
262
        '''
 
263
        block = klass.udisks_block(partition)
 
264
        return klass.client.get_object(block.get_object_path()).get_property('filesystem')
 
265
 
 
266
    @classmethod
 
267
    def blkid(klass, partition=None, device=None):
 
268
        '''Call blkid and return dictionary of results.'''
 
269
 
 
270
        if not device:
 
271
            device = klass.devname(partition)
 
272
        result = {}
 
273
        cmd = subprocess.Popen(['blkid', '-p', '-o', 'udev', device], stdout=subprocess.PIPE)
 
274
        for l in cmd.stdout:
 
275
            (key, value) = l.decode('UTF-8').split('=', 1)
 
276
            result[key] = value.strip()
 
277
        assert cmd.wait() == 0
 
278
        return result
 
279
 
 
280
    @classmethod
 
281
    def is_mountpoint(klass, path):
 
282
        '''Check if given path is a mount point.'''
 
283
 
 
284
        return subprocess.call(['mountpoint', path], stdout=subprocess.PIPE) == 0
 
285
 
 
286
    @classmethod
 
287
    def mkfs(klass, type, label=None, partition=None):
 
288
        '''Create file system using mkfs.'''
 
289
 
 
290
        if type == 'minix':
 
291
            assert label is None, 'minix does not support labels'
 
292
 
 
293
        # work around mkswap not properly cleaning up an existing reiserfs
 
294
        # signature (mailed kzak about it)
 
295
        if type == 'swap':
 
296
            subprocess.check_call(['wipefs', '-a', klass.devname(partition)],
 
297
                    stdout=subprocess.PIPE)
 
298
 
 
299
        mkcmd =     { 'swap': 'mkswap',
 
300
                    }
 
301
        label_opt = { 'vfat': '-n', 
 
302
                      'reiserfs': '-l',
 
303
                    }
 
304
        extra_opt = { 'vfat': [ '-I', '-F', '32'],
 
305
                      'swap': ['-f'],
 
306
                      'xfs': ['-f'], # XFS complains if there's an existing FS, so force
 
307
                      'ext2': ['-F'], # ext* complains about using entire device, so force
 
308
                      'ext3': ['-F'],
 
309
                      'ext4': ['-F'],
 
310
                      'ntfs': ['-F'],
 
311
                      'reiserfs': ['-ff'],
 
312
                    }
 
313
 
 
314
        cmd = [mkcmd.get(type, 'mkfs.' + type)] + extra_opt.get(type, [])
 
315
        if label:
 
316
            cmd += [label_opt.get(type, '-L'), label]
 
317
        cmd.append(klass.devname(partition))
 
318
 
 
319
        subprocess.check_call(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
320
 
 
321
        # kernel/udev generally detect those changes itself, but do not quite
 
322
        # tell us when they are done; so do a little kludge here to know how
 
323
        # long we need to wait
 
324
        subprocess.call(['udevadm', 'trigger', '--action=change',
 
325
            '--sysname-match=' + os.path.basename(klass.devname(partition))])
 
326
        klass.sync()
 
327
 
 
328
    @classmethod
 
329
    def fs_create(klass, partition, type, options):
 
330
        '''Create file system using udisks.'''
 
331
 
 
332
        block = klass.udisks_block(partition)
 
333
        block.call_format_sync(type, options, None)
 
334
        klass.sync_workaround()
 
335
 
 
336
    @classmethod
 
337
    def retry_busy(klass, fn, *args):
 
338
        '''Call a function until it does not fail with "Busy".'''
 
339
 
 
340
        timeout = 10
 
341
        while timeout >= 0:
 
342
            try:
 
343
                return fn(*args)
 
344
            except GLib.GError as e:
 
345
                if not 'UDisks2.Error.DeviceBusy' in e.message:
 
346
                    raise
 
347
                sys.stderr.write('[busy] ')
 
348
                time.sleep(0.3)
 
349
                timeout -= 1
 
350
 
 
351
    @classmethod
 
352
    def check_build_tree_config(klass):
 
353
        '''Check configuration of build tree'''
 
354
 
 
355
        # read make variables
 
356
        make_vars = {}
 
357
        var_re = re.compile('^([a-zA-Z_]+) = (.*)$')
 
358
        make = subprocess.Popen(['make', '-p', '/dev/null'],
 
359
                stdout=subprocess.PIPE)
 
360
        for l in make.stdout:
 
361
            l = l.decode('UTF-8')
 
362
            m = var_re.match(l)
 
363
            if m:
 
364
                make_vars[m.group(1)] = m.group(2)
 
365
        make.wait()
 
366
 
 
367
        # expand make variables
 
368
        subst_re = re.compile('\${([a-zA-Z_]+)}')
 
369
        for (k, v) in make_vars.items():
 
370
            while True:
 
371
                m = subst_re.search(v)
 
372
                if m:
 
373
                    v = subst_re.sub(make_vars.get(m.group(1), ''), v)
 
374
                    make_vars[k] = v
 
375
                else:
 
376
                    break
 
377
 
 
378
        # check localstatedir
 
379
        for d in (os.path.join(make_vars['localstatedir'], 'run', 'udisks2'),
 
380
                os.path.join(make_vars['localstatedir'], 'lib', 'udisks2')):
 
381
            if not os.path.exists(d):
 
382
                sys.stderr.write('The directory %s does not exist; please create it before running these tests.\n' % d)
 
383
                sys.exit(0)
 
384
        
 
385
    @classmethod
 
386
    def setup_vdev(klass):
 
387
        '''create virtual test device
 
388
        
 
389
        It is zeroed out initially.
 
390
 
 
391
        Return the device path.
 
392
        '''
 
393
        # ensure that the scsi_debug module is loaded
 
394
        if os.path.isdir('/sys/module/scsi_debug'):
 
395
            sys.stderr.write('The scsi_debug module is already loaded; please remove before running this test.\n')
 
396
            sys.exit(1)
 
397
 
 
398
        assert subprocess.call(['modprobe', 'scsi_debug', 'dev_size_mb=%i' % (
 
399
            VDEV_SIZE/1048576)]) == 0, 'Failure to modprobe scsi_debug'
 
400
 
 
401
        # wait until all drives are created
 
402
        dirs = []
 
403
        while len(dirs) < 1:
 
404
            dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block')
 
405
            time.sleep(0.1)
 
406
        assert len(dirs) == 1
 
407
 
 
408
        # determine the debug block devices
 
409
        devs = os.listdir(dirs[0])
 
410
        assert len(devs) == 1
 
411
        dev = '/dev/' + devs[0]
 
412
        assert os.path.exists(dev)
 
413
 
 
414
        # let's be 100% sure that we pick a virtual one
 
415
        assert open('/sys/block/%s/device/model' % devs[0]).read().strip() == 'scsi_debug'
 
416
 
 
417
        print('Set up test device: ' + dev)
 
418
        return dev
 
419
 
 
420
    @classmethod
 
421
    def teardown_vdev(klass, device):
 
422
        '''release and remove virtual test device'''
 
423
 
 
424
        klass.remove_device(device)
 
425
        assert subprocess.call(['rmmod', 'scsi_debug']) == 0, \
 
426
                'Failure to rmmod scsi_debug'
 
427
 
 
428
    @classmethod
 
429
    def remove_device(klass, device):
 
430
        '''remove virtual test device'''
 
431
 
 
432
        device = device.split('/')[-1]
 
433
        if os.path.exists('/sys/block/' + device):
 
434
            f = open('/sys/block/%s/device/delete' % device, 'w')
 
435
            f.write('1')
 
436
            f.close()
 
437
        while os.path.exists(device):
 
438
            time.sleep(0.1)
 
439
        klass.sync()
 
440
        time.sleep(0.5) # TODO
 
441
 
 
442
    @classmethod
 
443
    def readd_devices(klass):
 
444
        '''re-add virtual test devices after removal'''
 
445
 
 
446
        scan_files = glob('/sys/bus/pseudo/devices/adapter*/host*/scsi_host/host*/scan')
 
447
        assert len(scan_files) > 0
 
448
        for f in scan_files:
 
449
            open(f, 'w').write('- - -\n')
 
450
        while not os.path.exists(klass.device):
 
451
            time.sleep(0.1)
 
452
        time.sleep(0.5)
 
453
        klass.sync()
 
454
 
 
455
# ----------------------------------------------------------------------------
 
456
 
 
457
class Manager(UDisksTestCase):
 
458
    '''UDisksManager operations'''
 
459
 
 
460
    def test_version(self):
 
461
        '''daemon version'''
 
462
 
 
463
        self.assertTrue(self.manager.get_property('version')[0].isdigit())
 
464
 
 
465
    def test_loop_rw(self):
 
466
        '''loop device R/W'''
 
467
 
 
468
        with tempfile.NamedTemporaryFile() as f:
 
469
            f.truncate(100000000)
 
470
            fd_list = Gio.UnixFDList.new_from_array([f.fileno()])
 
471
 
 
472
            (path, out_fd_list) = self.manager.call_loop_setup_sync(
 
473
                GLib.Variant('h', 0), # fd index
 
474
                no_options,
 
475
                fd_list,
 
476
                None)
 
477
            self.client.settle()
 
478
 
 
479
            obj = self.client.get_object(path)
 
480
            loop = obj.get_property('loop')
 
481
            block = obj.get_property('block')
 
482
            self.assertNotEqual(block, None)
 
483
            self.assertNotEqual(loop, None)
 
484
            self.assertEqual(obj.get_property('filesystem'), None)
 
485
 
 
486
            try:
 
487
                self.assertEqual(loop.get_property('backing-file'), f.name)
 
488
 
 
489
                options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'foo')})
 
490
                block.call_format_sync('ext2', options, None)
 
491
                self.client.settle()
 
492
                self.assertNotEqual(obj.get_property('filesystem'), None)
 
493
 
 
494
                self.assertEqual(block.get_property('id-label'), 'foo')
 
495
                self.assertEqual(block.get_property('id-usage'), 'filesystem')
 
496
                self.assertEqual(block.get_property('id-type'), 'ext2')
 
497
            finally:
 
498
                loop.call_delete_sync(no_options, None)
 
499
 
 
500
    def test_loop_ro(self):
 
501
        '''loop device R/O'''
 
502
 
 
503
        with tempfile.NamedTemporaryFile() as f:
 
504
            f.truncate(100000000)
 
505
            fd_list = Gio.UnixFDList.new_from_array([f.fileno()])
 
506
 
 
507
            (path, out_fd_list) = self.manager.call_loop_setup_sync(
 
508
                GLib.Variant('h', 0), # fd index
 
509
                GLib.Variant('a{sv}', {'read-only': GLib.Variant('b', True)}),
 
510
                fd_list,
 
511
                None)
 
512
            self.client.settle()
 
513
 
 
514
            obj = self.client.get_object(path)
 
515
            loop = obj.get_property('loop')
 
516
            block = obj.get_property('block')
 
517
            self.assertNotEqual(block, None)
 
518
            self.assertNotEqual(loop, None)
 
519
            self.assertEqual(obj.get_property('filesystem'), None)
 
520
 
 
521
            try:
 
522
                self.assertEqual(loop.get_property('backing-file'), f.name)
 
523
 
 
524
                # can't format due to permission error
 
525
                self.assertRaises(GLib.GError, block.call_format_sync, 'ext2',
 
526
                        no_options, None)
 
527
 
 
528
                self.assertEqual(block.get_property('id-label'), '')
 
529
                self.assertEqual(block.get_property('id-usage'), '')
 
530
                self.assertEqual(block.get_property('id-type'), '')
 
531
            finally:
 
532
                self.client.settle()
 
533
                loop.call_delete_sync(no_options, None)
 
534
 
 
535
# ----------------------------------------------------------------------------
 
536
 
 
537
class Drive(UDisksTestCase):
 
538
    '''UDisksDrive'''
 
539
 
 
540
    def setUp(self):
 
541
        self.drive = self.client.get_drive_for_block(self.udisks_block())
 
542
        self.assertNotEqual(self.drive, None)
 
543
 
 
544
    def test_properties(self):
 
545
        '''properties of UDisksDrive object'''
 
546
 
 
547
        self.assertEqual(self.drive.get_property('model'), 'scsi_debug')
 
548
        self.assertEqual(self.drive.get_property('vendor'), 'Linux')
 
549
        self.assertAlmostEqual(self.drive.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0)
 
550
        self.assertEqual(self.drive.get_property('media-available'), True)
 
551
        self.assertEqual(self.drive.get_property('optical'), False)
 
552
 
 
553
        self.assertNotEqual(len(self.drive.get_property('serial')), 0)
 
554
        self.assertNotEqual(len(self.drive.get_property('revision')), 0)
 
555
 
 
556
# ----------------------------------------------------------------------------
 
557
 
 
558
class FS(UDisksTestCase):
 
559
    '''Test detection of all supported file systems'''
 
560
 
 
561
    def setUp(self):
 
562
        self.workdir = tempfile.mkdtemp()
 
563
        self.block = self.udisks_block()
 
564
        self.assertNotEqual(self.block, None)
 
565
 
 
566
    def tearDown(self):
 
567
        if subprocess.call(['umount', self.device], stderr=subprocess.PIPE) == 0:
 
568
            sys.stderr.write('[cleanup unmount] ')
 
569
        shutil.rmtree (self.workdir)
 
570
 
 
571
    def test_zero(self):
 
572
        '''properties of zeroed out device'''
 
573
 
 
574
        self.zero_device()
 
575
        self.assertEqual(self.block.get_property('device'), self.device)
 
576
        self.assertTrue('Linux_scsi_debug' in self.block.get_property('drive'))
 
577
        self.assertEqual(self.block.get_property('hint-system'), True)
 
578
        self.assertEqual(self.block.get_property('id-label'), '')
 
579
        self.assertEqual(self.block.get_property('id-usage'), '')
 
580
        self.assertEqual(self.block.get_property('id-type'), '')
 
581
        self.assertEqual(self.block.get_property('id-uuid'), '')
 
582
        self.assertAlmostEqual(self.block.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0)
 
583
        obj = self.client.get_object(self.block.get_object_path())
 
584
        self.assertEqual(obj.get_property('filesystem'), None)
 
585
        self.assertEqual(obj.get_property('partition'), None)
 
586
        self.assertEqual(obj.get_property('partition-table'), None)
 
587
 
 
588
    def test_ext2(self):
 
589
        '''fs: ext2'''
 
590
        self._do_fs_check('ext2')
 
591
 
 
592
    def test_ext3(self):
 
593
        '''fs: ext3'''
 
594
        self._do_fs_check('ext3')
 
595
 
 
596
    def test_ext4(self):
 
597
        '''fs: ext4'''
 
598
        self._do_fs_check('ext4')
 
599
 
 
600
    def test_btrfs(self):
 
601
        '''fs: btrfs'''
 
602
        self._do_fs_check('btrfs')
 
603
 
 
604
    def test_minix(self):
 
605
        '''fs: minix'''
 
606
        self._do_fs_check('minix')
 
607
 
 
608
    def test_xfs(self):
 
609
        '''fs: XFS'''
 
610
        self._do_fs_check('xfs')
 
611
 
 
612
    def test_ntfs(self):
 
613
        '''fs: NTFS'''
 
614
        self._do_fs_check('ntfs')
 
615
 
 
616
    def test_vfat(self):
 
617
        '''fs: FAT'''
 
618
        self._do_fs_check('vfat')
 
619
 
 
620
    def test_reiserfs(self):
 
621
        '''fs: reiserfs'''
 
622
        self._do_fs_check('reiserfs')
 
623
 
 
624
    def test_swap(self):
 
625
        '''fs: swap'''
 
626
        self._do_fs_check('swap')
 
627
 
 
628
    def test_nilfs2(self):
 
629
        '''fs: nilfs2'''
 
630
        self._do_fs_check('nilfs2')
 
631
 
 
632
    def test_empty(self):
 
633
        '''fs: empty'''
 
634
 
 
635
        self.mkfs('ext4', 'foo')
 
636
        block = self.udisks_block()
 
637
        self.assertEqual(block.get_property('id-usage'), 'filesystem')
 
638
        self.assertEqual(block.get_property('id-type'), 'ext4')
 
639
        self.assertEqual(block.get_property('id-label'), 'foo')
 
640
        self.assertNotEqual(self.udisks_filesystem(), None)
 
641
 
 
642
        self.fs_create(None, 'empty', no_options)
 
643
 
 
644
        self.assertEqual(block.get_property('id-usage'), '')
 
645
        self.assertEqual(block.get_property('id-type'), '')
 
646
        self.assertEqual(block.get_property('id-label'), '')
 
647
        self.assertEqual(self.udisks_filesystem(), None)
 
648
 
 
649
    def test_create_fs_unknown_type(self):
 
650
        '''Format() with unknown type'''
 
651
 
 
652
        try:
 
653
            self.fs_create(None, 'bogus', no_options)
 
654
            self.fail('Expected failure for bogus file system')
 
655
        except GLib.GError as e:
 
656
            self.assertTrue('UDisks2.Error.NotSupported' in e.message)
 
657
            self.assertTrue('type bogus' in e.message)
 
658
 
 
659
    def test_create_fs_unsupported_label(self):
 
660
        '''Format() with unsupported label'''
 
661
 
 
662
        options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'foo')})
 
663
        try:
 
664
            self.fs_create(None, 'minix', options)
 
665
            self.fail('Expected failure for unsupported label')
 
666
        except GLib.GError as e:
 
667
            self.assertTrue('UDisks2.Error.NotSupported' in e.message)
 
668
 
 
669
    def test_force_removal(self):
 
670
        '''fs: forced removal'''
 
671
 
 
672
        # create a fs and mount it
 
673
        self.mkfs('ext4', 'udiskstest')
 
674
        fs = self.udisks_filesystem()
 
675
        mount_path = fs.call_mount_sync(no_options, None)
 
676
        self.assertTrue(mount_path.endswith('udiskstest'))
 
677
        self.assertTrue('/media/' in mount_path)
 
678
        self.assertTrue(self.is_mountpoint(mount_path))
 
679
 
 
680
        dev_t = os.stat(self.devname()).st_rdev
 
681
 
 
682
        # removal should clean up mounts
 
683
        self.remove_device(self.device)
 
684
        self.assertFalse(os.path.exists(mount_path))
 
685
        self.assertEqual(self.client.get_block_for_dev(dev_t), None)
 
686
 
 
687
        # after putting it back, it should be mountable again
 
688
        self.readd_devices()
 
689
        fs = self.udisks_filesystem()
 
690
        self.assertEqual(fs.get_property('mount-points'), [])
 
691
 
 
692
        mount_path = fs.call_mount_sync(no_options, None)
 
693
        self.assertTrue(mount_path.endswith('udiskstest'))
 
694
        self.assertTrue('/media/' in mount_path)
 
695
        self.assertTrue(self.is_mountpoint(mount_path))
 
696
        self.client.settle()
 
697
        self.assertEqual(fs.get_property('mount-points'), [mount_path])
 
698
 
 
699
        self.retry_busy(fs.call_unmount_sync, no_options, None)
 
700
        self.client.settle()
 
701
        self.assertEqual(fs.get_property('mount-points'), [])
 
702
 
 
703
    def _do_fs_check(self, type):
 
704
        '''Run checks for a particular file system.'''
 
705
 
 
706
        if type != 'swap' and subprocess.call(['which', 'mkfs.' + type],
 
707
                stdout=subprocess.PIPE) != 0:
 
708
            sys.stderr.write('[no mkfs.%s, skip] ' % type)
 
709
 
 
710
            # check correct D-Bus exception
 
711
            try:
 
712
                self.fs_create(None, type, no_options)
 
713
                self.fail('Expected failure for missing mkfs.' + type)
 
714
            except GLib.GError as e:
 
715
                self.assertTrue('UDisks2.Error.Failed' in e.message)
 
716
            return
 
717
 
 
718
        # do checks with command line tools (mkfs/mount/umount)
 
719
        sys.stderr.write('[cli] ')
 
720
        sys.stderr.flush()
 
721
 
 
722
        self._do_cli_check(type)
 
723
        if type != 'minix':
 
724
            self._do_cli_check(type, 'test%stst' % type)
 
725
 
 
726
        # put a different fs here instead of zeroing, so that we verify that
 
727
        # udisks overrides existing FS (e. g. XFS complains then), and does not
 
728
        # leave traces of other FS around
 
729
        if type == 'ext3':
 
730
            self.mkfs('swap')
 
731
        else:
 
732
            self.mkfs('ext3')
 
733
 
 
734
        # do checks with udisks operations
 
735
        sys.stderr.write('[ud] ')
 
736
        self._do_udisks_check(type)
 
737
        if type != 'minix':
 
738
            self._do_udisks_check(type, 'test%stst' % type)
 
739
            # also test fs_create with an empty label
 
740
            self._do_udisks_check(type, '')
 
741
 
 
742
    def _do_cli_check(self, type, label=None):
 
743
        '''udisks correctly picks up file system changes from command line tools'''
 
744
 
 
745
        self.mkfs(type, label)
 
746
 
 
747
        block = self.udisks_block()
 
748
 
 
749
        self.assertEqual(block.get_property('id-usage'), (type == 'swap') and 'other' or 'filesystem')
 
750
 
 
751
        self.assertEqual(block.get_property('id-type'), type)
 
752
        self.assertEqual(block.get_property('id-label'), label or '')
 
753
        self.assertEqual(block.get_property('hint-name'), '')
 
754
        if type != 'minix':
 
755
            self.assertEqual(block.get_property('id-uuid'), self.blkid()['ID_FS_UUID'])
 
756
 
 
757
        obj = self.client.get_object(self.block.get_object_path())
 
758
        self.assertEqual(obj.get_property('partition'), None)
 
759
        self.assertEqual(obj.get_property('partition-table'), None)
 
760
 
 
761
        fs = obj.get_property('filesystem')
 
762
        if type == 'swap':
 
763
            self.assertEqual(fs, None)
 
764
        else:
 
765
            self.assertNotEqual(fs, None)
 
766
 
 
767
        if type == 'swap':
 
768
            return
 
769
 
 
770
        # mount it
 
771
        if type == 'ntfs' and subprocess.call(['which', 'mount.ntfs-3g'],
 
772
                stdout=subprocess.PIPE) == 0:
 
773
            # prefer mount.ntfs-3g if we have it (on Debian; Ubuntu
 
774
            # defaults to ntfs-3g if installed); TODO: check other distros
 
775
            mount_prog = 'mount.ntfs-3g'
 
776
        else:
 
777
            mount_prog = 'mount'
 
778
        ret = subprocess.call([mount_prog, self.device, self.workdir])
 
779
        if ret == 32:
 
780
            # missing fs driver
 
781
            sys.stderr.write('[missing kernel driver, skip] ')
 
782
            return
 
783
        self.assertEqual(ret, 0)
 
784
 
 
785
        self.sync()
 
786
        self.assertEqual(fs.get_property('mount-points'), [self.workdir])
 
787
 
 
788
        # unmount it
 
789
        subprocess.call(['umount', self.workdir])
 
790
        self.sync()
 
791
        self.assertEqual(fs.get_property('mount-points'), [])
 
792
 
 
793
    def _do_udisks_check(self, type, label=None):
 
794
        '''udisks API correctly changes file system'''
 
795
 
 
796
        # create fs 
 
797
        if label is not None:
 
798
            options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', label)})
 
799
        else:
 
800
            options = no_options
 
801
        self.fs_create(None, type, options)
 
802
 
 
803
        # properties
 
804
        id = self.blkid()
 
805
        self.assertEqual(id['ID_FS_USAGE'], type == 'swap' and 'other' or 'filesystem')
 
806
        self.assertEqual(id['ID_FS_TYPE'], type)
 
807
        self.assertEqual(id.get('ID_FS_LABEL', ''), label or '')
 
808
 
 
809
        block = self.udisks_block()
 
810
        self.assertEqual(block.get_property('id-usage'), (type == 'swap') and 'other' or 'filesystem')
 
811
        self.assertEqual(block.get_property('id-type'), type)
 
812
        self.assertEqual(block.get_property('id-label'), label or '')
 
813
 
 
814
        if type == 'swap':
 
815
            return
 
816
 
 
817
        obj = self.client.get_object(self.block.get_object_path())
 
818
        self.assertEqual(obj.get_property('partition'), None)
 
819
        self.assertEqual(obj.get_property('partition-table'), None)
 
820
 
 
821
        fs = self.udisks_filesystem()
 
822
        self.assertNotEqual(fs, None, 'no Filesystem interface for test device')
 
823
        self.assertEqual(fs.get_property('mount-points'), [])
 
824
 
 
825
        # mount
 
826
        mount_path = fs.call_mount_sync(no_options, None)
 
827
 
 
828
        self.assertTrue(mount_path.startswith('/run/media/'), mount_path)
 
829
        if label:
 
830
            self.assertTrue(mount_path.endswith(label))
 
831
 
 
832
        self.sync()
 
833
        self.assertEqual(fs.get_property('mount-points'), [mount_path])
 
834
        self.assertTrue(self.is_mountpoint(mount_path))
 
835
 
 
836
        # no ownership taken, should be root owned
 
837
        st = os.stat(mount_path)
 
838
        self.assertEqual((st.st_uid, st.st_gid), (0, 0))
 
839
 
 
840
        self._do_file_perms_checks(type, mount_path)
 
841
 
 
842
        # unmount
 
843
        self.retry_busy(fs.call_unmount_sync, no_options, None)
 
844
        self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
 
845
        self.assertEqual(fs.get_property('mount-points'), [mount_path])
 
846
 
 
847
        # create fs with taking ownership (daemon:mail == 1:8)
 
848
        #if supports_unix_owners:
 
849
        #    options.append('take_ownership_uid=1')
 
850
        #    options.append('take_ownership_gid=8')
 
851
        #    self.fs_create(None, type, options)
 
852
        #    mount_path = iface.FilesystemMount('', [])
 
853
        #    st = os.stat(mount_path)
 
854
        #    self.assertEqual((st.st_uid, st.st_gid), (1, 8))
 
855
        #    self.retry_busy(self.partition_iface().FilesystemUnmount, [])
 
856
        #    self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
 
857
 
 
858
        # change label
 
859
        supported = True
 
860
        l = 'n"a\m\\"e' + type
 
861
        if type == 'vfat':
 
862
            # VFAT does not support some characters
 
863
            self.assertRaises(GLib.GError, fs.call_set_label_sync, l, 
 
864
                    no_options, None)
 
865
            l = "n@a$me"
 
866
        try:
 
867
            fs.call_set_label_sync(l, no_options, None)
 
868
        except GLib.GError as e:
 
869
            if 'UDisks2.Error.NotSupported' in e.message:
 
870
                # these fses are known to not support relabeling
 
871
                self.assertTrue(type in ['minix', 'btrfs'])
 
872
                supported = False
 
873
            else:
 
874
                raise
 
875
 
 
876
        if supported:
 
877
            block = self.udisks_block()
 
878
            blkid_label = self.blkid().get('ID_FS_LABEL_ENC', '').replace('\\x22', '"').replace(
 
879
                    '\\x5c', '\\').replace('\\x24', '$')
 
880
            self.sync_workaround()
 
881
            if type == 'vfat':
 
882
                # EXFAIL: often (but not always) the label appears in all upper case
 
883
                self.assertEqual(blkid_label.upper(), l.upper())
 
884
                self.assertEqual(block.get_property('id-label').upper(), l.upper())
 
885
            else:
 
886
                self.assertEqual(blkid_label, l)
 
887
                self.assertEqual(block.get_property('id-label'), l)
 
888
 
 
889
            # test setting empty label
 
890
            fs.call_set_label_sync('', no_options, None)
 
891
            self.sync_workaround()
 
892
            self.assertEqual(self.blkid().get('ID_FS_LABEL_ENC', ''), '')
 
893
            self.assertEqual(block.get_property('id-label'), '')
 
894
 
 
895
        # check fs - Not implemented in udisks yet
 
896
        #self.assertEqual(iface.FilesystemCheck([]), True)
 
897
 
 
898
    def _do_file_perms_checks(self, type, mount_point):
 
899
        '''Check for permissions for data files and executables.
 
900
 
 
901
        This particularly checks sane and useful permissions on non-Unix file
 
902
        systems like vfat.
 
903
        '''
 
904
        if type in BROKEN_PERMISSIONS_FS:
 
905
            return
 
906
 
 
907
        f = os.path.join(mount_point, 'simpledata.txt')
 
908
        open(f, 'w').close()
 
909
        self.assertTrue(os.access(f, os.R_OK))
 
910
        self.assertTrue(os.access(f, os.W_OK))
 
911
        self.assertFalse(os.access(f, os.X_OK))
 
912
 
 
913
        f = os.path.join(mount_point, 'simple.exe')
 
914
        shutil.copy('/bin/bash', f)
 
915
        self.assertTrue(os.access(f, os.R_OK))
 
916
        self.assertTrue(os.access(f, os.W_OK))
 
917
        self.assertTrue(os.access(f, os.X_OK))
 
918
 
 
919
        os.mkdir(os.path.join(mount_point, 'subdir'))
 
920
        f = os.path.join(mount_point, 'subdir', 'subdirdata.txt')
 
921
        open(f, 'w').close()
 
922
        self.assertTrue(os.access(f, os.R_OK))
 
923
        self.assertTrue(os.access(f, os.W_OK))
 
924
        self.assertFalse(os.access(f, os.X_OK))
 
925
 
 
926
        f = os.path.join(mount_point, 'subdir', 'subdir.exe')
 
927
        shutil.copy('/bin/bash', f)
 
928
        self.assertTrue(os.access(f, os.R_OK))
 
929
        self.assertTrue(os.access(f, os.W_OK))
 
930
        self.assertTrue(os.access(f, os.X_OK))
 
931
 
 
932
## ----------------------------------------------------------------------------
 
933
 
 
934
class Smart(UDisksTestCase):
 
935
    '''Check SMART operation.'''
 
936
 
 
937
    def test_sda(self):
 
938
        '''SMART status of first internal hard disk
 
939
        
 
940
        This is a best-effort readonly test.
 
941
        '''
 
942
        hd = '/dev/sda'
 
943
 
 
944
        if not os.path.exists(hd):
 
945
            sys.stderr.write('[skip] ')
 
946
            return
 
947
 
 
948
        has_smart = subprocess.call(['skdump', '--can-smart', hd],
 
949
                stdout=subprocess.PIPE, stderr=subprocess.STDOUT) == 0
 
950
 
 
951
        block = self.client.get_block_for_dev(os.stat(hd).st_rdev)
 
952
        self.assertNotEqual(block, None)
 
953
        drive = self.client.get_drive_for_block(block)
 
954
        ata = self.client.get_object(drive.get_object_path()).get_property('drive-ata')
 
955
        self.assertEqual(ata != None, has_smart)
 
956
 
 
957
        if has_smart:
 
958
            sys.stderr.write('[avail] ')
 
959
            self.assertEqual(ata.get_property('smart-supported'), True)
 
960
            self.assertEqual(ata.get_property('smart-enabled'), True)
 
961
 
 
962
            # wait for SMART data to be read
 
963
            while ata.get_property('smart-updated') == 0:
 
964
                sys.stderr.write('[wait for data] ')
 
965
                time.sleep(0.5)
 
966
 
 
967
            # this is of course not truly correct for a test suite, but let's
 
968
            # consider it a courtesy for developers :-)
 
969
            self.assertEqual(ata.get_property('smart-failing'), False)
 
970
            self.assertTrue(ata.get_property('smart-selftest-status') in ['success', 'inprogress'])
 
971
        else:
 
972
            sys.stderr.write('[N/A] ')
 
973
 
 
974
 
 
975
# ----------------------------------------------------------------------------
 
976
 
 
977
class Luks(UDisksTestCase):
 
978
    '''Check LUKS.'''
 
979
 
 
980
    def tearDown(self):
 
981
        '''clean up behind failed test cases'''
 
982
 
 
983
        crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
 
984
        if crypt_obj:
 
985
            encrypted = crypt_obj.get_property('encrypted')
 
986
            if encrypted:
 
987
                try:
 
988
                    encrypted.call_lock_sync(no_options, None)
 
989
                    sys.stderr.write('[cleanup lock] ')
 
990
                except GLib.GError:
 
991
                    pass
 
992
 
 
993
    # needs to run before the other tests
 
994
    def test_0_create_teardown(self):
 
995
        '''LUKS create/teardown'''
 
996
 
 
997
        self.fs_create(None, 'ext4', GLib.Variant('a{sv}', {
 
998
            'encrypt.passphrase': GLib.Variant('s', 's3kr1t'),
 
999
            'label': GLib.Variant('s', 'treasure'),
 
1000
            }))
 
1001
 
 
1002
        try:
 
1003
            block = self.udisks_block()
 
1004
            obj = self.client.get_object(block.get_object_path())
 
1005
            self.assertEqual(obj.get_property('filesystem'), None)
 
1006
            encrypted = obj.get_property('encrypted')
 
1007
            self.assertNotEqual(encrypted, None)
 
1008
 
 
1009
            # check crypted device info
 
1010
            self.assertEqual(block.get_property('id-type'), 'crypto_LUKS')
 
1011
            self.assertEqual(block.get_property('id-usage'), 'crypto')
 
1012
            self.assertEqual(block.get_property('id-label'), '')
 
1013
            self.assertEqual(block.get_property('id-uuid'), self.blkid()['ID_FS_UUID'])
 
1014
            self.assertEqual(block.get_property('device'), self.devname())
 
1015
 
 
1016
            # check whether we can lock/unlock; we also need this to get the
 
1017
            # cleartext device
 
1018
            encrypted.call_lock_sync(no_options, None)
 
1019
            self.assertRaises(GLib.GError, encrypted.call_lock_sync, 
 
1020
                    no_options, None)
 
1021
            
 
1022
            # wrong password
 
1023
            self.assertRaises(GLib.GError, encrypted.call_unlock_sync, 
 
1024
                    'h4ckpassword', no_options, None)
 
1025
            # right password
 
1026
            clear_path = encrypted.call_unlock_sync('s3kr1t', 
 
1027
                    no_options, None)
 
1028
 
 
1029
            # check cleartext device info
 
1030
            clear_obj = self.client.get_object(clear_path)
 
1031
            self.assertEqual(clear_obj.get_property('encrypted'), None)
 
1032
            clear_block = clear_obj.get_property('block')
 
1033
            self.assertEqual(clear_block.get_property('id-type'), 'ext4')
 
1034
            self.assertEqual(clear_block.get_property('id-usage'), 'filesystem')
 
1035
            self.assertEqual(clear_block.get_property('id-label'), 'treasure')
 
1036
            self.assertNotEqual(clear_block.get_property('crypto-backing-device'), None)
 
1037
            clear_dev = clear_block.get_property('device')
 
1038
            self.assertNotEqual(clear_dev, None)
 
1039
            self.assertEqual(clear_block.get_property('id-uuid'),
 
1040
                    self.blkid(device=clear_dev)['ID_FS_UUID'])
 
1041
 
 
1042
            clear_fs = clear_obj.get_property('filesystem')
 
1043
            self.assertEqual(clear_fs.get_property('mount-points'), [])
 
1044
 
 
1045
            # check that we do not leak key information
 
1046
            udev_dump = subprocess.Popen(['udevadm', 'info', '--export-db'],
 
1047
                    stdout=subprocess.PIPE)
 
1048
            out = udev_dump.communicate()[0]
 
1049
            self.assertFalse(b's3kr1t' in out, 'password in udev properties')
 
1050
            self.assertFalse(b'essiv:sha' in out, 'key information in udev properties')
 
1051
 
 
1052
        finally:
 
1053
            # tear down cleartext device
 
1054
            encrypted.call_lock_sync(no_options, None)
 
1055
            self.assertFalse(os.path.exists(clear_dev))
 
1056
 
 
1057
    def test_luks_mount(self):
 
1058
        '''LUKS mount/unmount'''
 
1059
 
 
1060
        crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
 
1061
        encrypted = crypt_obj.get_property('encrypted')
 
1062
 
 
1063
        path = encrypted.call_unlock_sync('s3kr1t', 
 
1064
                no_options, None)
 
1065
        self.client.settle()
 
1066
        obj = self.client.get_object(path)
 
1067
        fs = obj.get_property('filesystem')
 
1068
        self.assertNotEqual(fs, None)
 
1069
 
 
1070
        # mount
 
1071
        mount_path = fs.call_mount_sync(no_options, None)
 
1072
 
 
1073
        try:
 
1074
            self.assertTrue('/media/' in mount_path)
 
1075
            self.assertTrue(mount_path.endswith('treasure'))
 
1076
            self.assertTrue(self.is_mountpoint(mount_path))
 
1077
            self.client.settle()
 
1078
            self.assertEqual(fs.get_property('mount-points'), [mount_path])
 
1079
 
 
1080
            # can't lock, busy
 
1081
            try:
 
1082
                encrypted.call_lock_sync(no_options, None)
 
1083
                self.fail('Lock() unexpectedly succeeded on mounted file system')
 
1084
            except GLib.GError as e:
 
1085
                self.assertTrue('UDisks2.Error.Failed' in e.message)
 
1086
        finally:
 
1087
            # umount
 
1088
            self.retry_busy(fs.call_unmount_sync, no_options, None)
 
1089
            self.client.settle()
 
1090
            self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
 
1091
            self.assertEqual(fs.get_property('mount-points'), [])
 
1092
 
 
1093
            # lock
 
1094
            encrypted.call_lock_sync(no_options, None)
 
1095
            self.client.settle()
 
1096
            self.assertEqual(self.client.get_object(path), None)
 
1097
 
 
1098
    def test_luks_forced_removal(self):
 
1099
        '''LUKS forced removal'''
 
1100
 
 
1101
        # unlock and mount it
 
1102
        crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
 
1103
        path = crypt_obj.get_property('encrypted').call_unlock_sync('s3kr1t', 
 
1104
                no_options, None)
 
1105
        try:
 
1106
            fs = self.client.get_object(path).get_property('filesystem')
 
1107
            mount_path = fs.call_mount_sync(no_options, None)
 
1108
            self.assertTrue('/media/' in mount_path)
 
1109
            self.assertTrue(mount_path.endswith('treasure'))
 
1110
 
 
1111
            # removal should clean up mounts
 
1112
            self.remove_device(self.device)
 
1113
            self.assertFalse(os.path.exists(mount_path))
 
1114
            self.assertEqual(self.client.get_object(path), None)
 
1115
 
 
1116
            # after putting it back, it should be mountable again
 
1117
            self.readd_devices()
 
1118
            crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
 
1119
            path = crypt_obj.get_property('encrypted').call_unlock_sync('s3kr1t', 
 
1120
                    no_options, None)
 
1121
            self.client.settle()
 
1122
            fs = self.client.get_object(path).get_property('filesystem')
 
1123
            mount_path = fs.call_mount_sync(no_options, None)
 
1124
            self.assertTrue('/media/' in mount_path)
 
1125
            self.assertTrue(mount_path.endswith('treasure'))
 
1126
 
 
1127
            # umount
 
1128
            self.retry_busy(fs.call_unmount_sync, no_options, None)
 
1129
            self.client.settle()
 
1130
            self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
 
1131
            self.assertEqual(fs.get_property('mount-points'), [])
 
1132
        finally:
 
1133
            # lock
 
1134
            crypt_obj.get_property('encrypted').call_lock_sync(
 
1135
                    no_options, None)
 
1136
            self.client.settle()
 
1137
            self.assertEqual(self.client.get_object(path), None)
 
1138
 
 
1139
# ----------------------------------------------------------------------------
 
1140
 
 
1141
class Polkit(UDisksTestCase):
 
1142
    '''Check operation with polkit.'''
 
1143
 
 
1144
    def setUp(self):
 
1145
        env = os.environ.copy()
 
1146
        env['POLKIT_DEBUG'] = '1'
 
1147
        self.polkit = subprocess.Popen([self.polkit_path],
 
1148
                stdout=self.daemon_log, stderr=subprocess.STDOUT, env=env)
 
1149
        assert self.polkit.pid, 'polkitd failed to start'
 
1150
 
 
1151
    def tearDown(self):
 
1152
        os.kill(self.polkit.pid, signal.SIGTERM)
 
1153
        os.waitpid(self.polkit.pid, 0)
 
1154
 
 
1155
    def test_internal_fs_root(self):
 
1156
        '''Create FS on internal drive as root'''
 
1157
 
 
1158
        options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'polkitext4')})
 
1159
        self.fs_create(None, 'ext4', options)
 
1160
        block = self.udisks_block()
 
1161
        self.assertEqual(block.get_property('id-usage'), 'filesystem')
 
1162
        self.assertEqual(block.get_property('id-type'), 'ext4')
 
1163
        self.assertEqual(block.get_property('id-label'), 'polkitext4')
 
1164
 
 
1165
    def test_internal_fs_nobody(self):
 
1166
        '''Try to create FS on internal drive as nobody'''
 
1167
 
 
1168
        # ensure we have a mountable file system
 
1169
        self.fs_create(None, 'ext4', no_options)
 
1170
 
 
1171
        nobody = pwd.getpwnam('nobody')
 
1172
        assert nobody.pw_uid > 0
 
1173
 
 
1174
        # we cannot just change euid and do the call, as polkit will remember
 
1175
        # our process which started out as root; so call the external tool
 
1176
        tool_mount = subprocess.Popen([self.tool_path, 'mount',
 
1177
            '--no-user-interaction', '-b', self.device],
 
1178
            stdout=subprocess.PIPE, stderr=subprocess.PIPE,
 
1179
            universal_newlines=True,
 
1180
            preexec_fn=lambda: (os.setgid(nobody.pw_gid), os.setuid(nobody.pw_uid)))
 
1181
        out, err = tool_mount.communicate()
 
1182
        self.assertTrue('Error.NotAuthorized' in err, err)
 
1183
 
 
1184
# ----------------------------------------------------------------------------
 
1185
 
 
1186
if __name__ == '__main__':
 
1187
    argparser = argparse.ArgumentParser(description='udisks2 integration test suite')
 
1188
    argparser.add_argument('-l', '--log-file', dest='logfile',
 
1189
            help='write daemon log to a file')
 
1190
    argparser.add_argument('-w', '--no-workarounds',
 
1191
            action="store_true", default=False,
 
1192
            help='Disable workarounds for race conditions in the D-BUS API')
 
1193
    argparser.add_argument('testname', nargs='*',
 
1194
            help='name of test class or method (e. g. "Drive", "FS.test_ext2")')
 
1195
    args = argparser.parse_args()
 
1196
 
 
1197
    workaround_syncs = not args.no_workarounds
 
1198
 
 
1199
    UDisksTestCase.init(logfile=args.logfile)
 
1200
    if args.testname:
 
1201
        tests = unittest.TestLoader().loadTestsFromNames(args.testname,
 
1202
                __import__('__main__'))
 
1203
    else:
 
1204
        tests = unittest.TestLoader().loadTestsFromName('__main__')
 
1205
    if unittest.TextTestRunner(verbosity=2).run(tests).wasSuccessful():
 
1206
        sys.exit(0)
 
1207
    else:
 
1208
        sys.exit(1)
 
1209