13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program. If not, see <http://www.gnu.org/licenses/>.
16
"""Atomic counting semaphore files."""
16
"""PID files containing multiple PIDs."""
18
18
from __future__ import print_function
27
27
__metaclass__ = type
30
class SemaphoreError(Exception):
30
class MultiPIDFileError(Exception):
35
"""A shared lock which only opens when all users have unlocked."""
35
"""A file tracking multiple PIDs."""
37
37
def __init__(self, path):
39
39
self.lock_path = "%s.lock" % path
42
return "semaphore %s" % self.path
42
return "multipidfile %s" % self.path
44
44
def __enter__(self):
45
45
command = ["lockfile", "-r", "4", self.lock_path]
46
46
if subprocess.call(command) != 0:
47
raise SemaphoreError("Cannot acquire lock on %s!" % self)
47
raise MultiPIDFileError("Cannot acquire lock on %s!" % self)
49
49
def __exit__(self, unused_exc_type, unused_exc_value, unused_exc_tb):
50
50
osextras.unlink_force(self.lock_path)
55
55
"Called _read on %s without locking!" % self)
57
57
with open(self.path) as fd:
58
pids = set(int(line) for line in fd)
59
return set(pid for pid in pids if osextras.pid_exists(pid))
59
60
except IOError as e:
60
61
if e.errno == errno.ENOENT:
64
def _add(self, offset):
65
def _write(self, pids):
65
66
# Must be called within context manager lock.
66
67
assert os.path.exists(self.lock_path), (
67
"Called _add on %s without locking!" % self)
69
with open(self.path, "w") as fd:
70
print(cur + offset, file=fd)
68
"Called _write on %s without locking!" % self)
70
with open(self.path, "w") as fd:
71
for pid in sorted(pids):
74
osextras.unlink_force(self.path)
75
"""Return current state of semaphore."""
78
"""Return current set of tracked PIDs."""
77
80
return self._read()
79
def test_increment(self):
80
"""Test, increment, return state of test."""
86
def decrement_test(self):
87
"""Decrement, test, return state of test.
89
It is an error to call decrement-test on a semaphore that is already
95
osextras.unlink_force(self.path)
97
"Attempted to decrement %s when already zero!" % self)
100
osextras.unlink_force(self.path)
82
def test_add(self, pid):
83
"""Test, add PID, return state of test.
85
It is an error to add a PID that is already present.
90
raise MultiPIDFileError(
91
"Attempted to add PID %d to %s which was already "
92
"present!" % (pid, self))
98
def remove_test(self, pid):
99
"""Remove PID, test, return state of test.
101
It is an error to remove a PID that is not already present.
108
raise MultiPIDFileError(
109
"Attempted to remove PID %d from %s which was not "
110
"present!" % (pid, self))
103
114
@contextlib.contextmanager
106
yield self.test_increment()
117
yield self.test_add(pid)
108
self.decrement_test()
119
self.remove_test(pid)