1
from __future__ import absolute_import
6
from . import (LockBase, LockFailed, NotLocked, NotMyLock, LockTimeout,
9
class LinkLockFile(LockBase):
10
"""Lock access to a file using atomic property of link(2).
12
>>> lock = LinkLockFile('somefile')
13
>>> lock = LinkLockFile('somefile', threaded=False)
16
def acquire(self, timeout=None):
18
open(self.unique_name, "wb").close()
20
raise LockFailed("failed to create %s" % self.unique_name)
22
end_time = time.time()
23
if timeout is not None and timeout > 0:
27
# Try and create a hard link to it.
29
print 'making a hard link %s to %s' % (self.unique_name,
31
os.link(self.unique_name, self.lock_file)
33
# Link creation failed. Maybe we've double-locked?
34
nlinks = os.stat(self.unique_name).st_nlink
36
# The original link plus the one I created == 2. We're
40
# Otherwise the lock creation failed.
41
if timeout is not None and time.time() > end_time:
42
os.unlink(self.unique_name)
47
time.sleep(timeout is not None and timeout/10 or 0.1)
49
# Link creation succeeded. We're good to go.
53
if not self.is_locked():
55
elif not os.path.exists(self.unique_name):
57
os.unlink(self.unique_name)
58
os.unlink(self.lock_file)
61
return os.path.exists(self.lock_file)
63
def i_am_locking(self):
64
return (self.is_locked() and
65
os.path.exists(self.unique_name) and
66
os.stat(self.unique_name).st_nlink == 2)
69
if os.path.exists(self.lock_file):
70
os.unlink(self.lock_file)