~xubuntu-dev/ubuntu-cdimage/xubuntu-base

« back to all changes in this revision

Viewing changes to lib/cdimage/tests/test_multipidfile.py

  • Committer: Sean Davis
  • Date: 2018-03-28 10:32:07 UTC
  • mfrom: (1559.1.156 ubuntu-cdimage)
  • Revision ID: smd.seandavis@gmail.com-20180328103207-o6s9d6h0hxxh8eqc
Merge lp:ubuntu-cdimage rev 1715

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
#! /usr/bin/python
2
2
 
3
 
# Copyright (C) 2013 Canonical Ltd.
 
3
# Copyright (C) 2013, 2016 Canonical Ltd.
4
4
# Author: Colin Watson <cjwatson@ubuntu.com>
5
5
 
6
6
# This program is free software: you can redistribute it and/or modify
15
15
# You should have received a copy of the GNU General Public License
16
16
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
17
 
18
 
"""Unit tests for cdimage.semaphore."""
 
18
"""Unit tests for cdimage.multipidfile."""
19
19
 
20
20
from __future__ import print_function
21
21
 
26
26
except ImportError:
27
27
    import mock
28
28
 
29
 
from cdimage.semaphore import Semaphore, SemaphoreError
 
29
from cdimage.multipidfile import MultiPIDFile, MultiPIDFileError
30
30
from cdimage.tests.helpers import TestCase, mkfile
31
31
 
32
32
__metaclass__ = type
33
33
 
34
34
 
35
 
class TestSemaphore(TestCase):
 
35
class TestMultiPIDFile(TestCase):
36
36
    def setUp(self):
37
 
        super(TestSemaphore, self).setUp()
 
37
        super(TestMultiPIDFile, self).setUp()
38
38
        self.use_temp_dir()
39
 
        self.semaphore = Semaphore(os.path.join(self.temp_dir, "sem"))
 
39
        self.multipidfile = MultiPIDFile(os.path.join(self.temp_dir, "pids"))
40
40
 
41
41
    def test_str(self):
42
 
        """A Semaphore stringifies to 'semaphore PATH'."""
 
42
        """A MultiPIDFile stringifies to 'multipidfile PATH'."""
43
43
        self.assertEqual(
44
 
            "semaphore %s" % self.semaphore.path, str(self.semaphore))
 
44
            "multipidfile %s" % self.multipidfile.path, str(self.multipidfile))
45
45
 
46
46
    def test_context_manager(self):
47
 
        """A Semaphore operates as a context manager, with locking."""
48
 
        self.assertFalse(os.path.exists(self.semaphore.lock_path))
49
 
        with self.semaphore:
50
 
            self.assertTrue(os.path.exists(self.semaphore.lock_path))
51
 
        self.assertFalse(os.path.exists(self.semaphore.lock_path))
 
47
        """A MultiPIDFile operates as a context manager, with locking."""
 
48
        self.assertFalse(os.path.exists(self.multipidfile.lock_path))
 
49
        with self.multipidfile:
 
50
            self.assertTrue(os.path.exists(self.multipidfile.lock_path))
 
51
        self.assertFalse(os.path.exists(self.multipidfile.lock_path))
52
52
 
53
53
    @mock.patch("subprocess.call", return_value=1)
54
54
    def test_lock_failure(self, mock_call):
55
 
        """__enter__ raises SemaphoreError if the lock is already held."""
56
 
        self.assertRaises(SemaphoreError, self.semaphore.__enter__)
 
55
        """__enter__ raises MultiPIDFileError if the lock is already held."""
 
56
        self.assertRaises(MultiPIDFileError, self.multipidfile.__enter__)
57
57
 
58
58
    def test_read_requires_lock(self):
59
59
        """_read must be called within the lock."""
60
 
        self.assertRaises(AssertionError, self.semaphore._read)
 
60
        self.assertRaises(AssertionError, self.multipidfile._read)
61
61
 
62
62
    def test_read_missing(self):
63
 
        """A missing semaphore file reads as zero."""
64
 
        with self.semaphore:
65
 
            self.assertEqual(0, self.semaphore._read())
66
 
 
67
 
    def test_read_existing(self):
68
 
        """An existing semaphore file reads as its integer value."""
69
 
        with mkfile(self.semaphore.path) as fd:
70
 
            print(10, file=fd)
71
 
        with self.semaphore:
72
 
            self.assertEqual(10, self.semaphore._read())
73
 
 
74
 
    def test_add_requires_lock(self):
75
 
        """_add must be called within the lock."""
76
 
        self.assertRaises(AssertionError, self.semaphore._add, 1)
77
 
 
78
 
    def test_add_missing(self):
79
 
        """Adding to a missing semaphore file treats it as initially zero."""
80
 
        with self.semaphore:
81
 
            self.assertEqual(1, self.semaphore._add(1))
82
 
            self.assertEqual(1, self.semaphore._read())
83
 
 
84
 
    def test_add_existing(self):
85
 
        """Adding to an existing semaphore file adjusts its contents."""
86
 
        with mkfile(self.semaphore.path) as fd:
87
 
            print(10, file=fd)
88
 
        with self.semaphore:
89
 
            self.assertEqual(9, self.semaphore._add(-1))
90
 
            self.assertEqual(9, self.semaphore._read())
 
63
        """A missing MultiPIDFile reads as empty."""
 
64
        with self.multipidfile:
 
65
            self.assertEqual(set(), self.multipidfile._read())
 
66
 
 
67
    @mock.patch("cdimage.osextras.pid_exists", return_value=True)
 
68
    def test_read_existing(self, mock_pid_exists):
 
69
        """An existing MultiPIDFile reads as the set of PIDs it contains."""
 
70
        with mkfile(self.multipidfile.path) as fd:
 
71
            print(1, file=fd)
 
72
            print(2, file=fd)
 
73
        with self.multipidfile:
 
74
            self.assertEqual(set([1, 2]), self.multipidfile._read())
 
75
 
 
76
    @mock.patch("cdimage.osextras.pid_exists")
 
77
    def test_read_skips_dead_pids(self, mock_pid_exists):
 
78
        """Only live PIDs are returned."""
 
79
        with mkfile(self.multipidfile.path) as fd:
 
80
            print(1, file=fd)
 
81
            print(2, file=fd)
 
82
        mock_pid_exists.side_effect = lambda pid: pid == 2
 
83
        with self.multipidfile:
 
84
            self.assertEqual(set([2]), self.multipidfile._read())
 
85
 
 
86
    def test_write_requires_lock(self):
 
87
        """_write must be called within the lock."""
 
88
        self.assertRaises(AssertionError, self.multipidfile._write, 1)
 
89
 
 
90
    @mock.patch("cdimage.osextras.pid_exists", return_value=True)
 
91
    def test_write_missing(self, mock_pid_exists):
 
92
        """Writing to a missing MultiPIDFile works."""
 
93
        with self.multipidfile:
 
94
            self.multipidfile._write(set([1, 2]))
 
95
            self.assertEqual(set([1, 2]), self.multipidfile._read())
 
96
 
 
97
    @mock.patch("cdimage.osextras.pid_exists", return_value=True)
 
98
    def test_write_existing(self, mock_pid_exists):
 
99
        """Writing to an existing MultiPIDFile adjusts its contents."""
 
100
        with mkfile(self.multipidfile.path) as fd:
 
101
            print(1, file=fd)
 
102
        with self.multipidfile:
 
103
            self.multipidfile._write(set([1, 2]))
 
104
            self.assertEqual(set([1, 2]), self.multipidfile._read())
91
105
 
92
106
    def test_state(self):
93
107
        """The state can be fetched without explicit locking."""
94
 
        self.assertEqual(0, self.semaphore.state)
95
 
 
96
 
    def test_test_increment(self):
97
 
        self.assertEqual(0, self.semaphore.test_increment())
98
 
        self.assertEqual(1, self.semaphore.state)
99
 
        self.assertEqual(1, self.semaphore.test_increment())
100
 
        self.assertEqual(2, self.semaphore.state)
101
 
 
102
 
    def test_decrement_test(self):
103
 
        with mkfile(self.semaphore.path) as fd:
 
108
        self.assertEqual(set(), self.multipidfile.state)
 
109
 
 
110
    @mock.patch("cdimage.osextras.pid_exists", return_value=True)
 
111
    def test_test_add(self, mock_pid_exists):
 
112
        self.assertFalse(self.multipidfile.test_add(1))
 
113
        self.assertEqual(set([1]), self.multipidfile.state)
 
114
        self.assertTrue(self.multipidfile.test_add(2))
 
115
        self.assertEqual(set([1, 2]), self.multipidfile.state)
 
116
 
 
117
    @mock.patch("cdimage.osextras.pid_exists", return_value=True)
 
118
    def test_test_add_error_on_existing_pid(self, mock_pid_exists):
 
119
        with mkfile(self.multipidfile.path) as fd:
 
120
            print(1, file=fd)
 
121
        self.assertRaises(MultiPIDFileError, self.multipidfile.test_add, 1)
 
122
 
 
123
    @mock.patch("cdimage.osextras.pid_exists", return_value=True)
 
124
    def test_remove_test(self, mock_pid_exists):
 
125
        with mkfile(self.multipidfile.path) as fd:
 
126
            print(1, file=fd)
104
127
            print(2, file=fd)
105
 
        self.assertEqual(1, self.semaphore.decrement_test())
106
 
        self.assertEqual(1, self.semaphore.state)
107
 
        self.assertEqual(0, self.semaphore.decrement_test())
108
 
        self.assertEqual(0, self.semaphore.state)
109
 
 
110
 
    def test_decrement_test_error_on_zero(self):
111
 
        """decrement_test raises SemaphoreError if already zero."""
112
 
        self.assertRaises(SemaphoreError, self.semaphore.decrement_test)
113
 
 
114
 
    def test_round_trip(self):
115
 
        """After a +/- round-trip, the semaphore path is missing."""
116
 
        self.assertEqual(0, self.semaphore.test_increment())
117
 
        self.assertEqual(0, self.semaphore.decrement_test())
118
 
        self.assertFalse(os.path.exists(self.semaphore.path))
119
 
 
120
 
    def test_held(self):
121
 
        self.assertEqual(0, self.semaphore.state)
122
 
        with self.semaphore.held() as state_zero:
123
 
            self.assertEqual(0, state_zero)
124
 
            self.assertEqual(1, self.semaphore.state)
125
 
            with self.semaphore.held() as state_one:
126
 
                self.assertEqual(1, state_one)
127
 
                self.assertEqual(2, self.semaphore.state)
128
 
            self.assertEqual(1, self.semaphore.state)
129
 
        self.assertEqual(0, self.semaphore.state)
 
128
        self.assertTrue(self.multipidfile.remove_test(2))
 
129
        self.assertEqual(set([1]), self.multipidfile.state)
 
130
        self.assertFalse(self.multipidfile.remove_test(1))
 
131
        self.assertEqual(set(), self.multipidfile.state)
 
132
 
 
133
    def test_remove_test_error_on_empty(self):
 
134
        """remove_test raises MultiPIDFileError if already empty."""
 
135
        self.assertRaises(MultiPIDFileError, self.multipidfile.remove_test, 1)
 
136
 
 
137
    @mock.patch("cdimage.osextras.pid_exists", return_value=True)
 
138
    def test_remove_test_error_on_missing_pid(self, mock_pid_exists):
 
139
        with mkfile(self.multipidfile.path) as fd:
 
140
            print(1, file=fd)
 
141
        self.assertRaises(MultiPIDFileError, self.multipidfile.remove_test, 2)
 
142
 
 
143
    @mock.patch("cdimage.osextras.pid_exists", return_value=True)
 
144
    def test_round_trip(self, mock_pid_exists):
 
145
        """After a +/- round-trip, the MultiPIDFile path is missing."""
 
146
        self.assertFalse(self.multipidfile.test_add(1))
 
147
        self.assertFalse(self.multipidfile.remove_test(1))
 
148
        self.assertFalse(os.path.exists(self.multipidfile.path))
 
149
 
 
150
    @mock.patch("cdimage.osextras.pid_exists", return_value=True)
 
151
    def test_held(self, mock_pid_exists):
 
152
        self.assertEqual(set(), self.multipidfile.state)
 
153
        with self.multipidfile.held(1) as state_zero:
 
154
            self.assertFalse(state_zero)
 
155
            self.assertEqual(set([1]), self.multipidfile.state)
 
156
            with self.multipidfile.held(2) as state_one:
 
157
                self.assertTrue(state_one)
 
158
                self.assertEqual(set([1, 2]), self.multipidfile.state)
 
159
            self.assertEqual(set([1]), self.multipidfile.state)
 
160
        self.assertEqual(set(), self.multipidfile.state)