~psusi/ubuntu/utopic/udisks2/fix-standby

« back to all changes in this revision

Viewing changes to .pc/00git_nonexisting_run_udev_rules.d.patch/src/tests/integration-test

  • Committer: Package Import Robot
  • Author(s): Martin Pitt
  • Date: 2013-05-22 15:27:56 UTC
  • mfrom: (14.1.5 saucy-proposed)
  • Revision ID: package-import@ubuntu.com-20130522152756-r7wn9l9wilgycocp
Tags: 2.1.0-3git1
Upload current Debian packaging git head to fix autopkgtest.

* Add 00git_nonexisting_run_udev_rules.d.patch: integration-test: Fix for
  nonexisting /run/udev/rules.d/.
* Add 00git_vfat_test_case_insensitivity.patch: integration-test: For VFAT,
  ignore case for label comparison.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/python3
 
2
#
 
3
# udisks2 integration test suite
 
4
#
 
5
# Run in udisks built tree to test local built binaries (needs
 
6
# --localstatedir=/var), or from anywhere else to test system installed
 
7
# binaries. 
 
8
#
 
9
# Usage:
 
10
# - Run all tests: 
 
11
#   src/tests/integration-test
 
12
# - Run only a particular class of tests:
 
13
#   src/tests/integration-test Drive
 
14
# - Run only a single test:
 
15
#   src/tests/integration-test FS.test_ext3
 
16
#
 
17
# Copyright: (C) 2011 Martin Pitt <martin.pitt@ubuntu.com>
 
18
#
 
19
# This program is free software; you can redistribute it and/or modify
 
20
# it under the terms of the GNU General Public License as published by
 
21
# the Free Software Foundation; either version 2 of the License, or
 
22
# (at your option) any later version.
 
23
#
 
24
# This program is distributed in the hope that it will be useful,
 
25
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
26
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
27
# GNU General Public License for more details.
 
28
 
 
29
# TODO:
 
30
# - add and test method for changing LUKS passphrase
 
31
# - test Format with take-ownership
 
32
 
 
33
import sys
 
34
import os
 
35
import pwd
 
36
 
 
37
srcdir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
 
38
libdir = os.path.join(srcdir, 'udisks', '.libs')
 
39
 
 
40
# as we can't change LD_LIBRARY_PATH within a running program, and doing
 
41
# #!/usr/bin/env LD_LIBRARY_PATH=... python3 does not work either, do this
 
42
# nasty hack
 
43
if 'LD_LIBRARY_PATH' not in os.environ and os.path.isdir(libdir):
 
44
    os.environ['LD_LIBRARY_PATH'] = libdir
 
45
    os.environ['GI_TYPELIB_PATH'] = '%s/udisks:%s' % (
 
46
            srcdir,
 
47
            os.environ.get('GI_TYPELIB_PATH', ''))
 
48
    os.execv(sys.argv[0], sys.argv)
 
49
    assert False, 'not expecting to land here'
 
50
 
 
51
import subprocess
 
52
import unittest
 
53
import tempfile
 
54
import atexit
 
55
import time
 
56
import shutil
 
57
import signal
 
58
import argparse
 
59
import re
 
60
from glob import glob
 
61
from gi.repository import GLib, Gio, UDisks
 
62
 
 
63
# find local test_polkit.py
 
64
sys.path.insert(0, os.path.dirname(__file__))
 
65
import test_polkitd
 
66
 
 
67
#GI_TYPELIB_PATH=udisks LD_LIBRARY_PATH=udisks/.libs
 
68
VDEV_SIZE = 300000000 # size of virtual test device
 
69
 
 
70
# Those file systems are known to have a broken handling of permissions, in
 
71
# particular the executable bit
 
72
BROKEN_PERMISSIONS_FS = ['ntfs']
 
73
 
 
74
# Some D-BUS API methods cause properties to not be up to date yet when a
 
75
# method call finishes, thus we do an udevadm settle as a workaround. Those
 
76
# methods should eventually get fixed properly, but it's unnerving to have
 
77
# the tests fail on them when you are working on something else. This flag
 
78
# gets set by the --no-workarounds option to disable those syncs, so that these
 
79
# race conditions can be fixed.
 
80
workaround_syncs = False
 
81
 
 
82
no_options = GLib.Variant('a{sv}', {})
 
83
 
 
84
# ----------------------------------------------------------------------------
 
85
 
 
86
class UDisksTestCase(unittest.TestCase):
 
87
    '''Base class for udisks test cases.
 
88
    
 
89
    This provides static functions which are useful for all test cases.
 
90
    '''
 
91
    daemon = None
 
92
    daemon_path = None
 
93
    daemon_log = None
 
94
    device = None
 
95
 
 
96
    client = None
 
97
    manager = None
 
98
 
 
99
    @classmethod
 
100
    def init(klass, logfile=None):
 
101
        '''start daemon and set up test environment'''
 
102
 
 
103
        if os.geteuid() != 0:
 
104
            print('this test suite needs to run as root', file=sys.stderr)
 
105
            sys.exit(0)
 
106
 
 
107
        # run from local build tree if we are in one, otherwise use system instance
 
108
        klass.daemon_path = os.path.join(srcdir, 'src', 'udisksd')
 
109
        if (os.access (klass.daemon_path, os.X_OK)):
 
110
            print('Testing binaries from local build tree')
 
111
            klass.check_build_tree_config()
 
112
        else:
 
113
            print('Testing installed system binaries')
 
114
            klass.daemon_path = None
 
115
            for l in open('/usr/share/dbus-1/system-services/org.freedesktop.UDisks2.service'):
 
116
                if l.startswith('Exec='):
 
117
                    klass.daemon_path = l.split('=', 1)[1].split()[0]
 
118
                    break
 
119
            assert klass.daemon_path, 'could not determine daemon path from D-BUS .service file'
 
120
 
 
121
        print('daemon path: ' + klass.daemon_path)
 
122
 
 
123
        (klass.device, klass.cd_device) = klass.setup_vdev()
 
124
 
 
125
        # start polkit and udisks on a private DBus
 
126
        klass.dbus = Gio.TestDBus()
 
127
        klass.dbus.up()
 
128
        os.environ['DBUS_SYSTEM_BUS_ADDRESS'] = klass.dbus.get_bus_address()
 
129
        # do not try to communicate with the current desktop session; this will
 
130
        # confuse it, as it cannot see this D-BUS instance
 
131
        try:
 
132
            del os.environ['DISPLAY']
 
133
        except KeyError:
 
134
            pass
 
135
        if logfile:
 
136
            klass.daemon_log = open(logfile, 'w')
 
137
        else:
 
138
            klass.daemon_log = tempfile.TemporaryFile()
 
139
        atexit.register(klass.cleanup)
 
140
 
 
141
        klass.start_daemon()
 
142
 
 
143
    @classmethod
 
144
    def cleanup(klass):
 
145
        '''stop daemon again and clean up test environment'''
 
146
 
 
147
        subprocess.call(['umount', klass.device], stderr=subprocess.PIPE) # if a test failed
 
148
 
 
149
        klass.stop_daemon()
 
150
 
 
151
        klass.teardown_vdev(klass.device)
 
152
        klass.device = None
 
153
 
 
154
        del os.environ['DBUS_SYSTEM_BUS_ADDRESS']
 
155
        klass.dbus.down()
 
156
 
 
157
    @classmethod
 
158
    def start_daemon(klass):
 
159
        assert klass.daemon == None
 
160
        klass.daemon = subprocess.Popen([klass.daemon_path, '--replace'],
 
161
            stdout=klass.daemon_log, stderr=subprocess.STDOUT)
 
162
        assert klass.daemon.pid, 'daemon failed to start'
 
163
 
 
164
        # wait until the daemon has started up
 
165
        timeout = 10
 
166
        klass.manager = None
 
167
        while klass.manager is None and timeout > 0:
 
168
            time.sleep(0.2)
 
169
            klass.client = UDisks.Client.new_sync(None)
 
170
            assert klass.client != None
 
171
            klass.manager = klass.client.get_manager()
 
172
            timeout -= 1
 
173
        assert klass.manager, 'daemon failed to start'
 
174
        assert klass.daemon.pid, 'daemon failed to start'
 
175
 
 
176
        klass.sync()
 
177
 
 
178
    @classmethod
 
179
    def stop_daemon(klass):
 
180
        assert klass.daemon
 
181
        os.kill(klass.daemon.pid, signal.SIGTERM)
 
182
        os.wait()
 
183
        klass.daemon = None
 
184
 
 
185
    @classmethod
 
186
    def sync(klass):
 
187
        '''Wait until pending events finished processing.
 
188
        
 
189
        This should only be called for situations where we genuinely have an
 
190
        asynchronous response, like invoking a CLI program and waiting for
 
191
        udev/udisks to catch up on the change events.
 
192
        '''
 
193
        subprocess.call(['udevadm', 'settle'])
 
194
        context = GLib.main_context_default()
 
195
        timeout = 100
 
196
        # wait until all GDBus events have been processed
 
197
        while context.pending() and timeout > 0:
 
198
            klass.client.settle()
 
199
            time.sleep(0.1)
 
200
            timeout -= 1
 
201
        if timeout <= 0:
 
202
            sys.stderr.write('[wait timeout!] ')
 
203
            sys.stderr.flush()
 
204
        klass.client.settle()
 
205
 
 
206
    @classmethod
 
207
    def sync_workaround(klass):
 
208
        '''Wait until pending events finished processing (bug workaround).
 
209
        
 
210
        This should be called for race conditions in the D-BUS API which cause
 
211
        properties to not be up to date yet when a method call finishes. Those
 
212
        should eventually get fixed properly, but it's unnerving to have the
 
213
        tests fail on them when you are working on something else.
 
214
 
 
215
        This sync is not done if running with --no-workarounds.
 
216
        '''
 
217
        if workaround_syncs:
 
218
            klass.sync()
 
219
 
 
220
    @classmethod
 
221
    def zero_device(klass):
 
222
        subprocess.call(['dd', 'if=/dev/zero', 'of='+klass.device, 'bs=10M'],
 
223
                stderr=subprocess.PIPE)
 
224
        time.sleep(0.5)
 
225
        klass.sync()
 
226
 
 
227
    @classmethod
 
228
    def devname(klass, partition=None, cd=False):
 
229
        '''Get name of test device or one of its partitions
 
230
 
 
231
        If cd is True, return the CD device, otherwise the hard disk device.
 
232
        '''
 
233
        if cd:
 
234
            dev = klass.cd_device
 
235
        else:
 
236
            dev = klass.device
 
237
        if partition:
 
238
            if dev[-1].isdigit():
 
239
                return dev + 'p' + str(partition)
 
240
            else:
 
241
                return dev + str(partition)
 
242
        else:
 
243
            return dev
 
244
 
 
245
    @classmethod
 
246
    def udisks_block(klass, partition=None, cd=False):
 
247
        '''Get UDisksBlock object for test device or partition
 
248
 
 
249
        If cd is True, return the CD device, otherwise the hard disk device.
 
250
        '''
 
251
        assert klass.client
 
252
        devname = klass.devname(partition, cd)
 
253
        dev_t = os.stat(devname).st_rdev
 
254
        block = klass.client.get_block_for_dev(dev_t)
 
255
        assert block, 'did not find an UDisksBlock object for %s' % devname
 
256
        return block
 
257
 
 
258
    @classmethod
 
259
    def udisks_filesystem(klass, partition=None, cd=False):
 
260
        '''Get UDisksFilesystem object for test device or partition
 
261
        
 
262
        Return None if there is no file system on that device.
 
263
 
 
264
        If cd is True, return the CD device, otherwise the hard disk device.
 
265
        '''
 
266
        block = klass.udisks_block(partition, cd)
 
267
        return klass.client.get_object(block.get_object_path()).get_filesystem()
 
268
 
 
269
    @classmethod
 
270
    def blkid(klass, partition=None, device=None):
 
271
        '''Call blkid and return dictionary of results.'''
 
272
 
 
273
        if not device:
 
274
            device = klass.devname(partition)
 
275
        result = {}
 
276
        cmd = subprocess.Popen(['blkid', '-p', '-o', 'udev', device], stdout=subprocess.PIPE)
 
277
        for l in cmd.stdout:
 
278
            (key, value) = l.decode('UTF-8').split('=', 1)
 
279
            result[key] = value.strip()
 
280
        assert cmd.wait() == 0
 
281
        return result
 
282
 
 
283
    @classmethod
 
284
    def is_mountpoint(klass, path):
 
285
        '''Check if given path is a mount point.'''
 
286
 
 
287
        return subprocess.call(['mountpoint', path], stdout=subprocess.PIPE) == 0
 
288
 
 
289
    @classmethod
 
290
    def mkfs(klass, type, label=None, partition=None):
 
291
        '''Create file system using mkfs.'''
 
292
 
 
293
        if type == 'minix':
 
294
            assert label is None, 'minix does not support labels'
 
295
 
 
296
        # work around mkswap not properly cleaning up an existing reiserfs
 
297
        # signature (mailed kzak about it)
 
298
        if type == 'swap':
 
299
            subprocess.check_call(['wipefs', '-a', klass.devname(partition)],
 
300
                    stdout=subprocess.PIPE)
 
301
 
 
302
        mkcmd =     { 'swap': 'mkswap',
 
303
                      'ntfs': 'mkntfs',
 
304
                    }
 
305
        label_opt = { 'vfat': '-n', 
 
306
                      'reiserfs': '-l',
 
307
                    }
 
308
        extra_opt = { 'vfat': [ '-I', '-F', '32'],
 
309
                      'swap': ['-f'],
 
310
                      'xfs': ['-f'], # XFS complains if there's an existing FS, so force
 
311
                      'ext2': ['-F'], # ext* complains about using entire device, so force
 
312
                      'ext3': ['-F'],
 
313
                      'ext4': ['-F'],
 
314
                      'ntfs': ['-F'],
 
315
                      'reiserfs': ['-ff'],
 
316
                    }
 
317
 
 
318
        cmd = [mkcmd.get(type, 'mkfs.' + type)] + extra_opt.get(type, [])
 
319
        if label:
 
320
            cmd += [label_opt.get(type, '-L'), label]
 
321
        cmd.append(klass.devname(partition))
 
322
 
 
323
        subprocess.check_call(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
324
 
 
325
        # kernel/udev generally detect those changes itself, but do not quite
 
326
        # tell us when they are done; so do a little kludge here to know how
 
327
        # long we need to wait
 
328
        subprocess.call(['udevadm', 'trigger', '--action=change',
 
329
            '--sysname-match=' + os.path.basename(klass.devname(partition))])
 
330
        klass.sync()
 
331
 
 
332
    @classmethod
 
333
    def fs_create(klass, partition, type, options):
 
334
        '''Create file system using udisks.'''
 
335
 
 
336
        block = klass.udisks_block(partition)
 
337
        block.call_format_sync(type, options, None)
 
338
        klass.sync_workaround()
 
339
 
 
340
    @classmethod
 
341
    def retry_busy(klass, fn, *args):
 
342
        '''Call a function until it does not fail with "Busy".'''
 
343
 
 
344
        timeout = 10
 
345
        while timeout >= 0:
 
346
            try:
 
347
                return fn(*args)
 
348
            except GLib.GError as e:
 
349
                if not 'UDisks2.Error.DeviceBusy' in e.message:
 
350
                    raise
 
351
                sys.stderr.write('[busy] ')
 
352
                time.sleep(0.3)
 
353
                timeout -= 1
 
354
 
 
355
    @classmethod
 
356
    def check_build_tree_config(klass):
 
357
        '''Check configuration of build tree'''
 
358
 
 
359
        # read make variables
 
360
        make_vars = {}
 
361
        var_re = re.compile('^([a-zA-Z_]+) = (.*)$')
 
362
        make = subprocess.Popen(['make', '-p', '/dev/null'],
 
363
                stdout=subprocess.PIPE)
 
364
        for l in make.stdout:
 
365
            l = l.decode('UTF-8')
 
366
            m = var_re.match(l)
 
367
            if m:
 
368
                make_vars[m.group(1)] = m.group(2)
 
369
        make.wait()
 
370
 
 
371
        # expand make variables
 
372
        subst_re = re.compile('\${([a-zA-Z_]+)}')
 
373
        for (k, v) in make_vars.items():
 
374
            while True:
 
375
                m = subst_re.search(v)
 
376
                if m:
 
377
                    v = subst_re.sub(make_vars.get(m.group(1), ''), v)
 
378
                    make_vars[k] = v
 
379
                else:
 
380
                    break
 
381
 
 
382
        # check localstatedir
 
383
        for d in (os.path.join(make_vars['localstatedir'], 'run', 'udisks2'),
 
384
                os.path.join(make_vars['localstatedir'], 'lib', 'udisks2')):
 
385
            if not os.path.exists(d):
 
386
                sys.stderr.write('The directory %s does not exist; please create it before running these tests.\n' % d)
 
387
                sys.exit(0)
 
388
        
 
389
    @classmethod
 
390
    def setup_vdev(klass):
 
391
        '''create virtual test devices
 
392
        
 
393
        It is zeroed out initially.
 
394
 
 
395
        Return a pair (writable HD device path, readonly CD device path).
 
396
        '''
 
397
        # ensure that the scsi_debug module is loaded
 
398
        if os.path.isdir('/sys/module/scsi_debug'):
 
399
            sys.stderr.write('The scsi_debug module is already loaded; please remove before running this test.\n')
 
400
            sys.exit(1)
 
401
 
 
402
        # work around scsi_debug not implementing CD-ROM SCSI commands, so that
 
403
        # udev's cdrom_id does not recognize tracks
 
404
        scsi_debug_rules = '/run/udev/rules.d/60-persistent-storage-scsi_debug.rules'
 
405
        if os.path.isdir('/run/udev/rules.d') and not os.path.exists(scsi_debug_rules):
 
406
            with open(scsi_debug_rules, 'w') as f:
 
407
                f.write('''KERNEL=="sr*", ENV{DISK_EJECT_REQUEST}!="?*", ATTRS{model}=="scsi_debug*", ENV{ID_CDROM_MEDIA}=="?*", IMPORT{program}="/sbin/blkid -o udev -p -u noraid $tempnode"
 
408
''')
 
409
            # reload udev
 
410
            subprocess.call('sync; pkill --signal HUP udevd || pkill --signal HUP systemd-udevd',
 
411
                            shell=True)
 
412
 
 
413
        # craete a fake SCSI hard drive
 
414
        assert subprocess.call(['modprobe', 'scsi_debug', 'dev_size_mb=%i' % (
 
415
            VDEV_SIZE/1048576)]) == 0, 'Failure to modprobe scsi_debug'
 
416
 
 
417
        # wait until drive got created
 
418
        rw_dirs = []
 
419
        while len(rw_dirs) < 1:
 
420
            rw_dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block')
 
421
            time.sleep(0.1)
 
422
        assert len(rw_dirs) == 1
 
423
 
 
424
        # create a fake CD-ROM, too
 
425
        with open('/sys/bus/pseudo/drivers/scsi_debug/ptype', 'w') as f:
 
426
            f.write('5')  # henceforth, created devices will be CD drives
 
427
        with open('/sys/bus/pseudo/drivers/scsi_debug/add_host', 'w') as f:
 
428
            f.write('1')  # generate a new drive
 
429
        subprocess.call(['udevadm', 'settle'])
 
430
        with open('/sys/bus/pseudo/drivers/scsi_debug/ptype', 'w') as f:
 
431
            f.write('0')
 
432
 
 
433
        ro_dirs = []
 
434
        while len(ro_dirs) < 2:
 
435
            ro_dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block')
 
436
            time.sleep(0.1)
 
437
        ro_dirs.remove(rw_dirs[0])
 
438
        assert len(ro_dirs) == 1
 
439
 
 
440
        # determine the debug block devices
 
441
        devs = os.listdir(ro_dirs[0])
 
442
        assert len(devs) == 1
 
443
        ro_dev = '/dev/' + devs[0]
 
444
        devs = os.listdir(rw_dirs[0])
 
445
        assert len(devs) == 1
 
446
        rw_dev = '/dev/' + devs[0]
 
447
        assert os.path.exists(ro_dev)
 
448
        assert os.path.exists(rw_dev)
 
449
 
 
450
        # let's be 100% sure that we pick a virtual one
 
451
        assert open('/sys/block/%s/device/model' % os.path.basename(rw_dev)).read().strip() == 'scsi_debug'
 
452
 
 
453
        print('Set up test device: r/w: %s, r/o: %s' % (rw_dev, ro_dev))
 
454
        return (rw_dev, ro_dev)
 
455
 
 
456
    @classmethod
 
457
    def teardown_vdev(klass, device):
 
458
        '''release and remove virtual test device'''
 
459
 
 
460
        klass.remove_device(device)
 
461
        assert subprocess.call(['rmmod', 'scsi_debug']) == 0, \
 
462
                'Failure to rmmod scsi_debug'
 
463
 
 
464
    @classmethod
 
465
    def remove_device(klass, device):
 
466
        '''remove virtual test device'''
 
467
 
 
468
        device = device.split('/')[-1]
 
469
        if os.path.exists('/sys/block/' + device):
 
470
            f = open('/sys/block/%s/device/delete' % device, 'w')
 
471
            f.write('1')
 
472
            f.close()
 
473
        while os.path.exists(device):
 
474
            time.sleep(0.1)
 
475
        klass.sync()
 
476
        time.sleep(0.5) # TODO
 
477
 
 
478
    @classmethod
 
479
    def readd_devices(klass):
 
480
        '''re-add virtual test devices after removal'''
 
481
 
 
482
        scan_files = glob('/sys/bus/pseudo/devices/adapter*/host*/scsi_host/host*/scan')
 
483
        assert len(scan_files) > 0
 
484
        for f in scan_files:
 
485
            open(f, 'w').write('- - -\n')
 
486
        while not os.path.exists(klass.device):
 
487
            time.sleep(0.1)
 
488
        time.sleep(0.5)
 
489
        klass.sync()
 
490
 
 
491
# ----------------------------------------------------------------------------
 
492
 
 
493
class Manager(UDisksTestCase):
 
494
    '''UDisksManager operations'''
 
495
 
 
496
    def test_version(self):
 
497
        '''daemon version'''
 
498
 
 
499
        self.assertTrue(self.manager.get_property('version')[0].isdigit())
 
500
 
 
501
    def test_loop_rw(self):
 
502
        '''loop device R/W'''
 
503
 
 
504
        with tempfile.NamedTemporaryFile() as f:
 
505
            f.truncate(100000000)
 
506
            fd_list = Gio.UnixFDList.new_from_array([f.fileno()])
 
507
 
 
508
            (path, out_fd_list) = self.manager.call_loop_setup_sync(
 
509
                GLib.Variant('h', 0), # fd index
 
510
                no_options,
 
511
                fd_list,
 
512
                None)
 
513
            self.client.settle()
 
514
 
 
515
            obj = self.client.get_object(path)
 
516
            loop = obj.get_property('loop')
 
517
            block = obj.get_property('block')
 
518
            self.assertNotEqual(block, None)
 
519
            self.assertNotEqual(loop, None)
 
520
            self.assertEqual(obj.get_property('filesystem'), None)
 
521
 
 
522
            try:
 
523
                self.assertEqual(loop.get_property('backing-file'), f.name)
 
524
 
 
525
                options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'foo')})
 
526
                block.call_format_sync('ext2', options, None)
 
527
                self.client.settle()
 
528
                self.assertNotEqual(obj.get_property('filesystem'), None)
 
529
 
 
530
                self.assertEqual(block.get_property('id-label'), 'foo')
 
531
                self.assertEqual(block.get_property('id-usage'), 'filesystem')
 
532
                self.assertEqual(block.get_property('id-type'), 'ext2')
 
533
            finally:
 
534
                loop.call_delete_sync(no_options, None)
 
535
 
 
536
    def test_loop_ro(self):
 
537
        '''loop device R/O'''
 
538
 
 
539
        with tempfile.NamedTemporaryFile() as f:
 
540
            f.truncate(100000000)
 
541
            fd_list = Gio.UnixFDList.new_from_array([f.fileno()])
 
542
 
 
543
            (path, out_fd_list) = self.manager.call_loop_setup_sync(
 
544
                GLib.Variant('h', 0), # fd index
 
545
                GLib.Variant('a{sv}', {'read-only': GLib.Variant('b', True)}),
 
546
                fd_list,
 
547
                None)
 
548
            self.client.settle()
 
549
 
 
550
            obj = self.client.get_object(path)
 
551
            loop = obj.get_property('loop')
 
552
            block = obj.get_property('block')
 
553
            self.assertNotEqual(block, None)
 
554
            self.assertNotEqual(loop, None)
 
555
            self.assertEqual(obj.get_property('filesystem'), None)
 
556
 
 
557
            try:
 
558
                self.assertEqual(loop.get_property('backing-file'), f.name)
 
559
 
 
560
                # can't format due to permission error
 
561
                self.assertRaises(GLib.GError, block.call_format_sync, 'ext2',
 
562
                        no_options, None)
 
563
 
 
564
                self.assertEqual(block.get_property('id-label'), '')
 
565
                self.assertEqual(block.get_property('id-usage'), '')
 
566
                self.assertEqual(block.get_property('id-type'), '')
 
567
            finally:
 
568
                self.client.settle()
 
569
                loop.call_delete_sync(no_options, None)
 
570
 
 
571
# ----------------------------------------------------------------------------
 
572
 
 
573
class Drive(UDisksTestCase):
 
574
    '''UDisksDrive'''
 
575
 
 
576
    def setUp(self):
 
577
        self.drive = self.client.get_drive_for_block(self.udisks_block())
 
578
        self.assertNotEqual(self.drive, None)
 
579
 
 
580
    def test_properties(self):
 
581
        '''properties of UDisksDrive object'''
 
582
 
 
583
        self.assertEqual(self.drive.get_property('model'), 'scsi_debug')
 
584
        self.assertEqual(self.drive.get_property('vendor'), 'Linux')
 
585
        self.assertAlmostEqual(self.drive.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0)
 
586
        self.assertEqual(self.drive.get_property('media-available'), True)
 
587
        self.assertEqual(self.drive.get_property('optical'), False)
 
588
 
 
589
        self.assertNotEqual(len(self.drive.get_property('serial')), 0)
 
590
        self.assertNotEqual(len(self.drive.get_property('revision')), 0)
 
591
 
 
592
# ----------------------------------------------------------------------------
 
593
 
 
594
class FS(UDisksTestCase):
 
595
    '''Test detection of all supported file systems'''
 
596
 
 
597
    def setUp(self):
 
598
        self.workdir = tempfile.mkdtemp()
 
599
        self.block = self.udisks_block()
 
600
        self.assertNotEqual(self.block, None)
 
601
 
 
602
    def tearDown(self):
 
603
        if subprocess.call(['umount', self.device], stderr=subprocess.PIPE) == 0:
 
604
            sys.stderr.write('[cleanup unmount] ')
 
605
        shutil.rmtree (self.workdir)
 
606
 
 
607
    def test_zero(self):
 
608
        '''properties of zeroed out device'''
 
609
 
 
610
        self.zero_device()
 
611
        self.assertEqual(self.block.get_property('device'), self.device)
 
612
        self.assertTrue('Linux_scsi_debug' in self.block.get_property('drive'))
 
613
        self.assertEqual(self.block.get_property('hint-system'), True)
 
614
        self.assertEqual(self.block.get_property('id-label'), '')
 
615
        self.assertEqual(self.block.get_property('id-usage'), '')
 
616
        self.assertEqual(self.block.get_property('id-type'), '')
 
617
        self.assertEqual(self.block.get_property('id-uuid'), '')
 
618
        self.assertAlmostEqual(self.block.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0)
 
619
        obj = self.client.get_object(self.block.get_object_path())
 
620
        self.assertEqual(obj.get_property('filesystem'), None)
 
621
        self.assertEqual(obj.get_property('partition'), None)
 
622
        self.assertEqual(obj.get_property('partition-table'), None)
 
623
 
 
624
    def test_ext2(self):
 
625
        '''fs: ext2'''
 
626
        self._do_fs_check('ext2')
 
627
 
 
628
    def test_ext3(self):
 
629
        '''fs: ext3'''
 
630
        self._do_fs_check('ext3')
 
631
 
 
632
    def test_ext4(self):
 
633
        '''fs: ext4'''
 
634
        self._do_fs_check('ext4')
 
635
 
 
636
    def test_btrfs(self):
 
637
        '''fs: btrfs'''
 
638
        self._do_fs_check('btrfs')
 
639
 
 
640
    def test_minix(self):
 
641
        '''fs: minix'''
 
642
        self._do_fs_check('minix')
 
643
 
 
644
    def test_xfs(self):
 
645
        '''fs: XFS'''
 
646
        self._do_fs_check('xfs')
 
647
 
 
648
    def test_ntfs(self):
 
649
        '''fs: NTFS'''
 
650
        self._do_fs_check('ntfs')
 
651
 
 
652
    def test_vfat(self):
 
653
        '''fs: FAT'''
 
654
        self._do_fs_check('vfat')
 
655
 
 
656
    def test_reiserfs(self):
 
657
        '''fs: reiserfs'''
 
658
        self._do_fs_check('reiserfs')
 
659
 
 
660
    def test_swap(self):
 
661
        '''fs: swap'''
 
662
        self._do_fs_check('swap')
 
663
 
 
664
    def test_nilfs2(self):
 
665
        '''fs: nilfs2'''
 
666
        self._do_fs_check('nilfs2')
 
667
 
 
668
    def test_empty(self):
 
669
        '''fs: empty'''
 
670
 
 
671
        self.mkfs('ext4', 'foo')
 
672
        block = self.udisks_block()
 
673
        self.assertEqual(block.get_property('id-usage'), 'filesystem')
 
674
        self.assertEqual(block.get_property('id-type'), 'ext4')
 
675
        self.assertEqual(block.get_property('id-label'), 'foo')
 
676
        self.assertNotEqual(self.udisks_filesystem(), None)
 
677
 
 
678
        self.fs_create(None, 'empty', no_options)
 
679
 
 
680
        self.assertEqual(block.get_property('id-usage'), '')
 
681
        self.assertEqual(block.get_property('id-type'), '')
 
682
        self.assertEqual(block.get_property('id-label'), '')
 
683
        self.assertEqual(self.udisks_filesystem(), None)
 
684
 
 
685
    def test_create_fs_unknown_type(self):
 
686
        '''Format() with unknown type'''
 
687
 
 
688
        try:
 
689
            self.fs_create(None, 'bogus', no_options)
 
690
            self.fail('Expected failure for bogus file system')
 
691
        except GLib.GError as e:
 
692
            self.assertTrue('UDisks2.Error.NotSupported' in e.message)
 
693
            self.assertTrue('type bogus' in e.message)
 
694
 
 
695
    def test_create_fs_unsupported_label(self):
 
696
        '''Format() with unsupported label'''
 
697
 
 
698
        options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'foo')})
 
699
        try:
 
700
            self.fs_create(None, 'minix', options)
 
701
            self.fail('Expected failure for unsupported label')
 
702
        except GLib.GError as e:
 
703
            self.assertTrue('UDisks2.Error.NotSupported' in e.message)
 
704
 
 
705
    def test_force_removal(self):
 
706
        '''fs: forced removal'''
 
707
 
 
708
        # create a fs and mount it
 
709
        self.mkfs('ext4', 'udiskstest')
 
710
        fs = self.udisks_filesystem()
 
711
        mount_path = fs.call_mount_sync(no_options, None)
 
712
        self.assertTrue(mount_path.endswith('udiskstest'))
 
713
        self.assertTrue('/media/' in mount_path)
 
714
        self.assertTrue(self.is_mountpoint(mount_path))
 
715
 
 
716
        dev_t = os.stat(self.devname()).st_rdev
 
717
 
 
718
        # removal should clean up mounts
 
719
        self.remove_device(self.device)
 
720
        self.assertFalse(os.path.exists(mount_path))
 
721
        self.assertEqual(self.client.get_block_for_dev(dev_t), None)
 
722
 
 
723
        # after putting it back, it should be mountable again
 
724
        self.readd_devices()
 
725
        fs = self.udisks_filesystem()
 
726
        self.assertEqual(fs.get_property('mount-points'), [])
 
727
 
 
728
        mount_path = fs.call_mount_sync(no_options, None)
 
729
        self.assertTrue(mount_path.endswith('udiskstest'))
 
730
        self.assertTrue('/media/' in mount_path)
 
731
        self.assertTrue(self.is_mountpoint(mount_path))
 
732
        self.client.settle()
 
733
        self.assertEqual(fs.get_property('mount-points'), [mount_path])
 
734
 
 
735
        self.retry_busy(fs.call_unmount_sync, no_options, None)
 
736
        self.client.settle()
 
737
        self.assertEqual(fs.get_property('mount-points'), [])
 
738
 
 
739
    def test_existing_manual_mount_point(self):
 
740
        '''fs: does not reuse existing manual mount point'''
 
741
 
 
742
        self.mkfs('ext4', 'udiskstest')
 
743
        fs = self.udisks_filesystem()
 
744
 
 
745
        # mount it, determine mount path, and unmount again
 
746
        mount_path = fs.call_mount_sync(no_options, None)
 
747
        self.assertTrue(mount_path.endswith('udiskstest'))
 
748
 
 
749
        self.retry_busy(fs.call_unmount_sync, no_options, None)
 
750
        self.client.settle()
 
751
        self.assertEqual(fs.get_property('mount-points'), [])
 
752
 
 
753
        # cleans up mountpoint
 
754
        self.assertFalse(os.path.exists(mount_path))
 
755
 
 
756
        # now manually create the mount point
 
757
        os.mkdir(mount_path)
 
758
 
 
759
        # now this should use mount_path + '1'
 
760
        try:
 
761
            new_mount_path = fs.call_mount_sync(no_options, None)
 
762
            self.retry_busy(fs.call_unmount_sync, no_options, None)
 
763
            self.client.settle()
 
764
            self.assertEqual(fs.get_property('mount-points'), [])
 
765
            self.assertEqual(new_mount_path, mount_path + '1')
 
766
        finally:
 
767
            os.rmdir(mount_path)
 
768
 
 
769
    def test_existing_udisks_mount_point(self):
 
770
        '''fs: reuses existing udisks mount point'''
 
771
 
 
772
        self.mkfs('ext4', 'udiskstest')
 
773
        fs = self.udisks_filesystem()
 
774
 
 
775
        # mount it, determine mount path
 
776
        mount_path = fs.call_mount_sync(no_options, None)
 
777
        self.assertTrue(mount_path.endswith('udiskstest'))
 
778
 
 
779
        # stop the daemon (happens during a package upgrade)
 
780
        UDisksTestCase.stop_daemon()
 
781
 
 
782
        # mount should still be there; unmount it manually
 
783
        self.assertTrue(self.is_mountpoint(mount_path))
 
784
        subprocess.check_call(['umount', mount_path])
 
785
 
 
786
        # restart daemon, mount again; this should use the same mount point as
 
787
        # before
 
788
        UDisksTestCase.start_daemon()
 
789
        fs = self.udisks_filesystem()
 
790
        new_mount_path = fs.call_mount_sync(no_options, None)
 
791
        self.retry_busy(fs.call_unmount_sync, no_options, None)
 
792
        self.client.settle()
 
793
        self.assertEqual(fs.get_property('mount-points'), [])
 
794
        self.assertEqual(new_mount_path, mount_path)
 
795
 
 
796
    def _do_fs_check(self, type):
 
797
        '''Run checks for a particular file system.'''
 
798
        if type == 'ntfs':
 
799
            mkfs = 'mkntfs'
 
800
        else:
 
801
            mkfs = 'mkfs.' + type
 
802
 
 
803
        if type != 'swap' and subprocess.call(['which', mkfs],
 
804
                stdout=subprocess.PIPE) != 0:
 
805
            sys.stderr.write('[no %s, skip] ' % mkfs)
 
806
 
 
807
            # check correct D-Bus exception
 
808
            try:
 
809
                self.fs_create(None, type, no_options)
 
810
                self.fail('Expected failure for missing mkfs.' + type)
 
811
            except GLib.GError as e:
 
812
                self.assertTrue('UDisks2.Error.Failed' in e.message)
 
813
            return
 
814
 
 
815
        # do checks with command line tools (mkfs/mount/umount)
 
816
        sys.stderr.write('[cli] ')
 
817
        sys.stderr.flush()
 
818
 
 
819
        self._do_cli_check(type)
 
820
        if type != 'minix':
 
821
            self._do_cli_check(type, 'test%stst' % type)
 
822
 
 
823
        # put a different fs here instead of zeroing, so that we verify that
 
824
        # udisks overrides existing FS (e. g. XFS complains then), and does not
 
825
        # leave traces of other FS around
 
826
        if type == 'ext3':
 
827
            self.mkfs('swap')
 
828
        else:
 
829
            self.mkfs('ext3')
 
830
 
 
831
        # do checks with udisks operations
 
832
        sys.stderr.write('[ud] ')
 
833
        self._do_udisks_check(type)
 
834
        if type != 'minix':
 
835
            self._do_udisks_check(type, 'test%stst' % type)
 
836
            # also test fs_create with an empty label
 
837
            self._do_udisks_check(type, '')
 
838
 
 
839
    def _do_cli_check(self, type, label=None):
 
840
        '''udisks correctly picks up file system changes from command line tools'''
 
841
 
 
842
        self.mkfs(type, label)
 
843
 
 
844
        block = self.udisks_block()
 
845
 
 
846
        self.assertEqual(block.get_property('id-usage'), (type == 'swap') and 'other' or 'filesystem')
 
847
 
 
848
        self.assertEqual(block.get_property('id-type'), type)
 
849
        self.assertEqual(block.get_property('id-label'), label or '')
 
850
        self.assertEqual(block.get_property('hint-name'), '')
 
851
        if type != 'minix':
 
852
            self.assertEqual(block.get_property('id-uuid'), self.blkid()['ID_FS_UUID'])
 
853
 
 
854
        obj = self.client.get_object(self.block.get_object_path())
 
855
        self.assertEqual(obj.get_property('partition'), None)
 
856
        self.assertEqual(obj.get_property('partition-table'), None)
 
857
 
 
858
        fs = obj.get_property('filesystem')
 
859
        if type == 'swap':
 
860
            self.assertEqual(fs, None)
 
861
        else:
 
862
            self.assertNotEqual(fs, None)
 
863
 
 
864
        if type == 'swap':
 
865
            return
 
866
 
 
867
        # mount it
 
868
        if type == 'ntfs' and subprocess.call(['which', 'mount.ntfs-3g'],
 
869
                stdout=subprocess.PIPE) == 0:
 
870
            # prefer mount.ntfs-3g if we have it (on Debian; Ubuntu
 
871
            # defaults to ntfs-3g if installed); TODO: check other distros
 
872
            mount_prog = 'mount.ntfs-3g'
 
873
        else:
 
874
            mount_prog = 'mount'
 
875
        ret = subprocess.call([mount_prog, self.device, self.workdir])
 
876
        if ret == 32:
 
877
            # missing fs driver
 
878
            sys.stderr.write('[missing kernel driver, skip] ')
 
879
            return
 
880
        self.assertEqual(ret, 0)
 
881
 
 
882
        time.sleep(0.5)
 
883
        self.sync()
 
884
        self.assertEqual(fs.get_property('mount-points'), [self.workdir])
 
885
 
 
886
        # unmount it
 
887
        subprocess.call(['umount', self.workdir])
 
888
        self.sync()
 
889
        self.assertEqual(fs.get_property('mount-points'), [])
 
890
 
 
891
    def _do_udisks_check(self, type, label=None):
 
892
        '''udisks API correctly changes file system'''
 
893
 
 
894
        # create fs 
 
895
        if label is not None:
 
896
            options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', label)})
 
897
        else:
 
898
            options = no_options
 
899
        self.fs_create(None, type, options)
 
900
 
 
901
        # properties
 
902
        id = self.blkid()
 
903
        self.assertEqual(id['ID_FS_USAGE'], type == 'swap' and 'other' or 'filesystem')
 
904
        self.assertEqual(id['ID_FS_TYPE'], type)
 
905
        self.assertEqual(id.get('ID_FS_LABEL', ''), label or '')
 
906
 
 
907
        block = self.udisks_block()
 
908
        self.assertEqual(block.get_property('id-usage'), (type == 'swap') and 'other' or 'filesystem')
 
909
        self.assertEqual(block.get_property('id-type'), type)
 
910
        self.assertEqual(block.get_property('id-label'), label or '')
 
911
 
 
912
        if type == 'swap':
 
913
            return
 
914
 
 
915
        obj = self.client.get_object(self.block.get_object_path())
 
916
        self.assertEqual(obj.get_property('partition'), None)
 
917
        self.assertEqual(obj.get_property('partition-table'), None)
 
918
 
 
919
        fs = self.udisks_filesystem()
 
920
        self.assertNotEqual(fs, None, 'no Filesystem interface for test device')
 
921
        self.assertEqual(fs.get_property('mount-points'), [])
 
922
 
 
923
        # mount
 
924
        mount_path = fs.call_mount_sync(no_options, None)
 
925
 
 
926
        self.assertTrue('/media/' in mount_path)
 
927
        if label:
 
928
            self.assertTrue(mount_path.endswith(label))
 
929
 
 
930
        self.sync()
 
931
        self.assertEqual(fs.get_property('mount-points'), [mount_path])
 
932
        self.assertTrue(self.is_mountpoint(mount_path))
 
933
 
 
934
        # no ownership taken, should be root owned
 
935
        st = os.stat(mount_path)
 
936
        self.assertEqual((st.st_uid, st.st_gid), (0, 0))
 
937
 
 
938
        self._do_file_perms_checks(type, mount_path)
 
939
 
 
940
        # unmount
 
941
        self.retry_busy(fs.call_unmount_sync, no_options, None)
 
942
        self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
 
943
        self.assertEqual(fs.get_property('mount-points'), [mount_path])
 
944
 
 
945
        # create fs with taking ownership (daemon:mail == 1:8)
 
946
        #if supports_unix_owners:
 
947
        #    options.append('take_ownership_uid=1')
 
948
        #    options.append('take_ownership_gid=8')
 
949
        #    self.fs_create(None, type, options)
 
950
        #    mount_path = iface.FilesystemMount('', [])
 
951
        #    st = os.stat(mount_path)
 
952
        #    self.assertEqual((st.st_uid, st.st_gid), (1, 8))
 
953
        #    self.retry_busy(self.partition_iface().FilesystemUnmount, [])
 
954
        #    self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
 
955
 
 
956
        # change label
 
957
        supported = True
 
958
        l = 'n"a\m\\"e' + type
 
959
        if type == 'vfat':
 
960
            # VFAT does not support some characters
 
961
            self.assertRaises(GLib.GError, fs.call_set_label_sync, l, 
 
962
                    no_options, None)
 
963
            l = "n@a$me"
 
964
        try:
 
965
            fs.call_set_label_sync(l, no_options, None)
 
966
        except GLib.GError as e:
 
967
            if 'UDisks2.Error.NotSupported' in e.message:
 
968
                # these fses are known to not support relabeling
 
969
                self.assertTrue(type in ['minix', 'btrfs'])
 
970
                supported = False
 
971
            else:
 
972
                raise
 
973
 
 
974
        if supported:
 
975
            block = self.udisks_block()
 
976
            blkid_label = self.blkid().get('ID_FS_LABEL_ENC', '').replace('\\x22', '"').replace(
 
977
                    '\\x5c', '\\').replace('\\x24', '$')
 
978
            self.sync_workaround()
 
979
            if type == 'vfat':
 
980
                # EXFAIL: often (but not always) the label appears in all upper case
 
981
                self.assertEqual(blkid_label.upper(), l.upper())
 
982
                self.assertEqual(block.get_property('id-label').upper(), l.upper())
 
983
            else:
 
984
                self.assertEqual(blkid_label, l)
 
985
                self.assertEqual(block.get_property('id-label'), l)
 
986
 
 
987
            # test setting empty label
 
988
            fs.call_set_label_sync('', no_options, None)
 
989
            self.sync_workaround()
 
990
            self.assertEqual(self.blkid().get('ID_FS_LABEL_ENC', ''), '')
 
991
            self.assertEqual(block.get_property('id-label'), '')
 
992
 
 
993
        # check fs - Not implemented in udisks yet
 
994
        #self.assertEqual(iface.FilesystemCheck([]), True)
 
995
 
 
996
        # check mounting of a read-only device
 
997
        # this is known-broken for reiserfs and xfs right now:
 
998
        # https://github.com/karelzak/util-linux/issues/17
 
999
        # https://github.com/karelzak/util-linux/issues/18
 
1000
        if type not in ['reiserfs', 'xfs']:
 
1001
            # the scsi_debug CD drive content is the same as for the HD drive, but
 
1002
            # udev does not know about this; so give it a nudge to re-probe
 
1003
            subprocess.call(['udevadm', 'trigger', '--action=change',
 
1004
                '--sysname-match=' + os.path.basename(self.cd_device)])
 
1005
            self.sync()
 
1006
            self.sync()
 
1007
            cd_fs = self.udisks_filesystem(cd=True)
 
1008
 
 
1009
            mount_path = cd_fs.call_mount_sync(no_options, None)
 
1010
            try:
 
1011
                self.assertTrue('/media/' in mount_path)
 
1012
                self.sync()
 
1013
                self.assertEqual(cd_fs.get_property('mount-points'), [mount_path])
 
1014
                self.assertTrue(self.is_mountpoint(mount_path))
 
1015
 
 
1016
                self.assertEqual(self.udisks_block(cd=True).get_property('read-only'), True)
 
1017
            finally:
 
1018
                self.retry_busy(cd_fs.call_unmount_sync, no_options, None)
 
1019
                self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
 
1020
                self.assertEqual(cd_fs.get_property('mount-points'), [mount_path])
 
1021
 
 
1022
    def _do_file_perms_checks(self, type, mount_point):
 
1023
        '''Check for permissions for data files and executables.
 
1024
 
 
1025
        This particularly checks sane and useful permissions on non-Unix file
 
1026
        systems like vfat.
 
1027
        '''
 
1028
        if type in BROKEN_PERMISSIONS_FS:
 
1029
            return
 
1030
 
 
1031
        f = os.path.join(mount_point, 'simpledata.txt')
 
1032
        open(f, 'w').close()
 
1033
        self.assertTrue(os.access(f, os.R_OK))
 
1034
        self.assertTrue(os.access(f, os.W_OK))
 
1035
        self.assertFalse(os.access(f, os.X_OK))
 
1036
 
 
1037
        f = os.path.join(mount_point, 'simple.exe')
 
1038
        shutil.copy('/bin/bash', f)
 
1039
        self.assertTrue(os.access(f, os.R_OK))
 
1040
        self.assertTrue(os.access(f, os.W_OK))
 
1041
        self.assertTrue(os.access(f, os.X_OK))
 
1042
 
 
1043
        os.mkdir(os.path.join(mount_point, 'subdir'))
 
1044
        f = os.path.join(mount_point, 'subdir', 'subdirdata.txt')
 
1045
        open(f, 'w').close()
 
1046
        self.assertTrue(os.access(f, os.R_OK))
 
1047
        self.assertTrue(os.access(f, os.W_OK))
 
1048
        self.assertFalse(os.access(f, os.X_OK))
 
1049
 
 
1050
        f = os.path.join(mount_point, 'subdir', 'subdir.exe')
 
1051
        shutil.copy('/bin/bash', f)
 
1052
        self.assertTrue(os.access(f, os.R_OK))
 
1053
        self.assertTrue(os.access(f, os.W_OK))
 
1054
        self.assertTrue(os.access(f, os.X_OK))
 
1055
 
 
1056
## ----------------------------------------------------------------------------
 
1057
 
 
1058
class Smart(UDisksTestCase):
 
1059
    '''Check SMART operation.'''
 
1060
 
 
1061
    def test_sda(self):
 
1062
        '''SMART status of first internal hard disk
 
1063
        
 
1064
        This is a best-effort readonly test.
 
1065
        '''
 
1066
        hd = '/dev/sda'
 
1067
 
 
1068
        if not os.path.exists(hd):
 
1069
            sys.stderr.write('[skip] ')
 
1070
            return
 
1071
 
 
1072
        has_smart = subprocess.call(['skdump', '--can-smart', hd],
 
1073
                stdout=subprocess.PIPE, stderr=subprocess.STDOUT) == 0
 
1074
 
 
1075
        block = self.client.get_block_for_dev(os.stat(hd).st_rdev)
 
1076
        self.assertNotEqual(block, None)
 
1077
        drive = self.client.get_drive_for_block(block)
 
1078
        ata = self.client.get_object(drive.get_object_path()).get_property('drive-ata')
 
1079
        self.assertEqual(ata != None, has_smart)
 
1080
 
 
1081
        if has_smart:
 
1082
            sys.stderr.write('[avail] ')
 
1083
            self.assertEqual(ata.get_property('smart-supported'), True)
 
1084
            self.assertEqual(ata.get_property('smart-enabled'), True)
 
1085
 
 
1086
            # wait for SMART data to be read
 
1087
            while ata.get_property('smart-updated') == 0:
 
1088
                sys.stderr.write('[wait for data] ')
 
1089
                time.sleep(0.5)
 
1090
 
 
1091
            # this is of course not truly correct for a test suite, but let's
 
1092
            # consider it a courtesy for developers :-)
 
1093
            self.assertEqual(ata.get_property('smart-failing'), False)
 
1094
            self.assertTrue(ata.get_property('smart-selftest-status') in ['success', 'inprogress'])
 
1095
        else:
 
1096
            sys.stderr.write('[N/A] ')
 
1097
 
 
1098
 
 
1099
# ----------------------------------------------------------------------------
 
1100
 
 
1101
class Luks(UDisksTestCase):
 
1102
    '''Check LUKS.'''
 
1103
 
 
1104
    def tearDown(self):
 
1105
        '''clean up behind failed test cases'''
 
1106
 
 
1107
        crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
 
1108
        if crypt_obj:
 
1109
            encrypted = crypt_obj.get_property('encrypted')
 
1110
            if encrypted:
 
1111
                try:
 
1112
                    encrypted.call_lock_sync(no_options, None)
 
1113
                    sys.stderr.write('[cleanup lock] ')
 
1114
                except GLib.GError:
 
1115
                    pass
 
1116
 
 
1117
    # needs to run before the other tests
 
1118
    def test_0_create_teardown(self):
 
1119
        '''LUKS create/teardown'''
 
1120
 
 
1121
        self.fs_create(None, 'ext4', GLib.Variant('a{sv}', {
 
1122
            'encrypt.passphrase': GLib.Variant('s', 's3kr1t'),
 
1123
            'label': GLib.Variant('s', 'treasure'),
 
1124
            }))
 
1125
 
 
1126
        try:
 
1127
            block = self.udisks_block()
 
1128
            obj = self.client.get_object(block.get_object_path())
 
1129
            self.assertEqual(obj.get_property('filesystem'), None)
 
1130
            encrypted = obj.get_property('encrypted')
 
1131
            self.assertNotEqual(encrypted, None)
 
1132
 
 
1133
            # check crypted device info
 
1134
            self.assertEqual(block.get_property('id-type'), 'crypto_LUKS')
 
1135
            self.assertEqual(block.get_property('id-usage'), 'crypto')
 
1136
            self.assertEqual(block.get_property('id-label'), '')
 
1137
            self.assertEqual(block.get_property('id-uuid'), self.blkid()['ID_FS_UUID'])
 
1138
            self.assertEqual(block.get_property('device'), self.devname())
 
1139
 
 
1140
            # check whether we can lock/unlock; we also need this to get the
 
1141
            # cleartext device
 
1142
            encrypted.call_lock_sync(no_options, None)
 
1143
            self.assertRaises(GLib.GError, encrypted.call_lock_sync, 
 
1144
                    no_options, None)
 
1145
            
 
1146
            # wrong password
 
1147
            self.assertRaises(GLib.GError, encrypted.call_unlock_sync, 
 
1148
                    'h4ckpassword', no_options, None)
 
1149
            # right password
 
1150
            clear_path = encrypted.call_unlock_sync('s3kr1t', 
 
1151
                    no_options, None)
 
1152
 
 
1153
            # check cleartext device info
 
1154
            clear_obj = self.client.get_object(clear_path)
 
1155
            self.assertEqual(clear_obj.get_property('encrypted'), None)
 
1156
            clear_block = clear_obj.get_property('block')
 
1157
            self.assertEqual(clear_block.get_property('id-type'), 'ext4')
 
1158
            self.assertEqual(clear_block.get_property('id-usage'), 'filesystem')
 
1159
            self.assertEqual(clear_block.get_property('id-label'), 'treasure')
 
1160
            self.assertNotEqual(clear_block.get_property('crypto-backing-device'), None)
 
1161
            clear_dev = clear_block.get_property('device')
 
1162
            self.assertNotEqual(clear_dev, None)
 
1163
            self.assertEqual(clear_block.get_property('id-uuid'),
 
1164
                    self.blkid(device=clear_dev)['ID_FS_UUID'])
 
1165
 
 
1166
            clear_fs = clear_obj.get_property('filesystem')
 
1167
            self.assertEqual(clear_fs.get_property('mount-points'), [])
 
1168
 
 
1169
            # check that we do not leak key information
 
1170
            udev_dump = subprocess.Popen(['udevadm', 'info', '--export-db'],
 
1171
                    stdout=subprocess.PIPE)
 
1172
            out = udev_dump.communicate()[0]
 
1173
            self.assertFalse(b's3kr1t' in out, 'password in udev properties')
 
1174
            self.assertFalse(b'essiv:sha' in out, 'key information in udev properties')
 
1175
 
 
1176
        finally:
 
1177
            # tear down cleartext device
 
1178
            encrypted.call_lock_sync(no_options, None)
 
1179
            self.assertFalse(os.path.exists(clear_dev))
 
1180
 
 
1181
    def test_luks_mount(self):
 
1182
        '''LUKS mount/unmount'''
 
1183
 
 
1184
        crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
 
1185
        encrypted = crypt_obj.get_property('encrypted')
 
1186
 
 
1187
        path = encrypted.call_unlock_sync('s3kr1t', 
 
1188
                no_options, None)
 
1189
        self.client.settle()
 
1190
        obj = self.client.get_object(path)
 
1191
        fs = obj.get_property('filesystem')
 
1192
        self.assertNotEqual(fs, None)
 
1193
 
 
1194
        # mount
 
1195
        mount_path = fs.call_mount_sync(no_options, None)
 
1196
 
 
1197
        try:
 
1198
            self.assertTrue('/media/' in mount_path)
 
1199
            self.assertTrue(mount_path.endswith('treasure'))
 
1200
            self.assertTrue(self.is_mountpoint(mount_path))
 
1201
            self.client.settle()
 
1202
            self.assertEqual(fs.get_property('mount-points'), [mount_path])
 
1203
 
 
1204
            # can't lock, busy
 
1205
            try:
 
1206
                encrypted.call_lock_sync(no_options, None)
 
1207
                self.fail('Lock() unexpectedly succeeded on mounted file system')
 
1208
            except GLib.GError as e:
 
1209
                self.assertTrue('UDisks2.Error.Failed' in e.message)
 
1210
        finally:
 
1211
            # umount
 
1212
            self.retry_busy(fs.call_unmount_sync, no_options, None)
 
1213
            self.client.settle()
 
1214
            self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
 
1215
            self.assertEqual(fs.get_property('mount-points'), [])
 
1216
 
 
1217
            # lock
 
1218
            encrypted.call_lock_sync(no_options, None)
 
1219
            self.client.settle()
 
1220
            self.assertEqual(self.client.get_object(path), None)
 
1221
 
 
1222
    def test_luks_forced_removal(self):
 
1223
        '''LUKS forced removal'''
 
1224
 
 
1225
        # unlock and mount it
 
1226
        crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
 
1227
        path = crypt_obj.get_property('encrypted').call_unlock_sync('s3kr1t', 
 
1228
                no_options, None)
 
1229
        try:
 
1230
            fs = self.client.get_object(path).get_property('filesystem')
 
1231
            mount_path = fs.call_mount_sync(no_options, None)
 
1232
            self.assertTrue('/media/' in mount_path)
 
1233
            self.assertTrue(mount_path.endswith('treasure'))
 
1234
 
 
1235
            # removal should clean up mounts
 
1236
            try:
 
1237
                self.remove_device(self.device)
 
1238
                self.assertFalse(os.path.exists(mount_path))
 
1239
                timeout = 50
 
1240
                while timeout > 0:
 
1241
                    if self.client.get_object(path) is None:
 
1242
                        break
 
1243
                    timeout -= 1
 
1244
                    # we do not have a main loop, and cannot currently use
 
1245
                    # g_main_context_get_default() from introspection, so
 
1246
                    # instead of refreshing self.client, get a new one
 
1247
                    self.client = UDisks.Client.new_sync(None)
 
1248
                    time.sleep(0.1)
 
1249
                self.assertGreater(timeout, 0, 'timeout waiting for object path %s to disappear' % path)
 
1250
            finally:
 
1251
                self.readd_devices()
 
1252
 
 
1253
            # after putting it back, it should be mountable again
 
1254
            crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
 
1255
            path = crypt_obj.get_property('encrypted').call_unlock_sync('s3kr1t', 
 
1256
                    no_options, None)
 
1257
            self.client.settle()
 
1258
            fs = self.client.get_object(path).get_property('filesystem')
 
1259
            mount_path = fs.call_mount_sync(no_options, None)
 
1260
            self.assertTrue('/media/' in mount_path)
 
1261
            self.assertTrue(mount_path.endswith('treasure'))
 
1262
 
 
1263
            # umount
 
1264
            self.retry_busy(fs.call_unmount_sync, no_options, None)
 
1265
            self.client.settle()
 
1266
            self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
 
1267
            self.assertEqual(fs.get_property('mount-points'), [])
 
1268
        finally:
 
1269
            # lock
 
1270
            crypt_obj.get_property('encrypted').call_lock_sync(
 
1271
                    no_options, None)
 
1272
            self.client.settle()
 
1273
            self.assertEqual(self.client.get_object(path), None)
 
1274
 
 
1275
# ----------------------------------------------------------------------------
 
1276
 
 
1277
class Polkit(UDisksTestCase, test_polkitd.PolkitTestCase):
 
1278
    '''Check operation with polkit.'''
 
1279
 
 
1280
    def test_internal_fs_forbidden(self):
 
1281
        '''Create FS on internal drive (forbidden)'''
 
1282
 
 
1283
        self.start_polkitd(['org.freedesktop.udisks2.modify-device'])
 
1284
 
 
1285
        options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'polkitno')})
 
1286
        with self.assertRaises(GLib.GError) as cm:
 
1287
            self.fs_create(None, 'ext4', options)
 
1288
        self.assertTrue('UDisks2.Error.NotAuthorized' in cm.exception.message,
 
1289
                cm.exception.message)
 
1290
 
 
1291
        # did not actually do anything
 
1292
        block = self.udisks_block()
 
1293
        self.assertNotEqual(block.get_property('id-label'), 'polkitno')
 
1294
 
 
1295
    def test_internal_fs_allowed(self):
 
1296
        '''Create FS on internal drive (allowed)'''
 
1297
 
 
1298
        self.start_polkitd(['org.freedesktop.udisks2.modify-device-system',
 
1299
            'org.freedesktop.udisks2.modify-device'])
 
1300
 
 
1301
        options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'polkityes')})
 
1302
        self.fs_create(None, 'ext4', options)
 
1303
        block = self.udisks_block()
 
1304
        self.assertEqual(block.get_property('id-usage'), 'filesystem')
 
1305
        self.assertEqual(block.get_property('id-type'), 'ext4')
 
1306
        self.assertEqual(block.get_property('id-label'), 'polkityes')
 
1307
 
 
1308
    def test_removable_fs(self):
 
1309
        '''Create FS on removable drive (allowed)'''
 
1310
 
 
1311
        self.start_polkitd(['org.freedesktop.udisks2.filesystem-mount'])
 
1312
 
 
1313
        # the scsi_debug CD drive content is the same as for the HD drive, but
 
1314
        # udev does not know about this; so give it a nudge to re-probe
 
1315
        subprocess.call(['udevadm', 'trigger', '--action=change',
 
1316
            '--sysname-match=' + os.path.basename(self.cd_device)])
 
1317
        self.sync()
 
1318
        self.sync()
 
1319
 
 
1320
        fs = self.udisks_filesystem(cd=True)
 
1321
        self.assertNotEqual(fs, None)
 
1322
        mount_path = fs.call_mount_sync(no_options, None)
 
1323
        self.assertTrue('/media/' in mount_path, mount_path)
 
1324
 
 
1325
        self.retry_busy(fs.call_unmount_sync, no_options, None)
 
1326
        self.client.settle()
 
1327
 
 
1328
# ----------------------------------------------------------------------------
 
1329
 
 
1330
if __name__ == '__main__':
 
1331
    argparser = argparse.ArgumentParser(description='udisks2 integration test suite')
 
1332
    argparser.add_argument('-l', '--log-file', dest='logfile',
 
1333
            help='write daemon log to a file')
 
1334
    argparser.add_argument('-w', '--no-workarounds',
 
1335
            action="store_true", default=False,
 
1336
            help='Disable workarounds for race conditions in the D-BUS API')
 
1337
    argparser.add_argument('testname', nargs='*',
 
1338
            help='name of test class or method (e. g. "Drive", "FS.test_ext2")')
 
1339
    args = argparser.parse_args()
 
1340
 
 
1341
    workaround_syncs = not args.no_workarounds
 
1342
 
 
1343
    UDisksTestCase.init(logfile=args.logfile)
 
1344
    if args.testname:
 
1345
        tests = unittest.TestLoader().loadTestsFromNames(args.testname,
 
1346
                __import__('__main__'))
 
1347
    else:
 
1348
        tests = unittest.TestLoader().loadTestsFromName('__main__')
 
1349
    if unittest.TextTestRunner(verbosity=2).run(tests).wasSuccessful():
 
1350
        sys.exit(0)
 
1351
    else:
 
1352
        sys.exit(1)
 
1353