41
39
cfg.BoolOpt('disable_process_locking', default=False,
42
help='Whether to disable inter-process locks'),
40
help='Whether to disable inter-process locks.'),
43
41
cfg.StrOpt('lock_path',
44
42
default=os.environ.get("CEILOMETER_LOCK_PATH"),
45
43
help=('Directory to use for lock files.'))
76
74
self.lockfile = None
78
basedir = os.path.dirname(self.fname)
80
if not os.path.exists(basedir):
81
fileutils.ensure_tree(basedir)
82
LOG.info(_('Created lock path: %s'), basedir)
80
84
self.lockfile = open(self.fname, 'w')
86
90
# Also upon reading the MSDN docs for locking(), it seems
87
91
# to have a laughable 10 attempts "blocking" mechanism.
93
LOG.debug(_('Got file lock "%s"'), self.fname)
90
95
except IOError as e:
91
96
if e.errno in (errno.EACCES, errno.EAGAIN):
92
97
# external locks synchronise things like iptables
93
98
# updates - give it some time to prevent busy spinning
98
def __exit__(self, exc_type, exc_val, exc_tb):
101
raise threading.ThreadError(_("Unable to acquire lock on"
102
" `%(filename)s` due to"
105
'filename': self.fname,
101
116
self.lockfile.close()
117
LOG.debug(_('Released file lock "%s"'), self.fname)
103
119
LOG.exception(_("Could not release the acquired lock `%s`"),
122
def __exit__(self, exc_type, exc_val, exc_tb):
106
125
def trylock(self):
107
126
raise NotImplementedError()
137
156
_semaphores_lock = threading.Lock()
140
@contextlib.contextmanager
141
def external_lock(name, lock_file_prefix=None, lock_path=None):
159
def external_lock(name, lock_file_prefix=None):
142
160
with internal_lock(name):
143
LOG.debug(_('Attempting to grab file lock "%(lock)s"'),
161
LOG.debug(_('Attempting to grab external lock "%(lock)s"'),
146
# We need a copy of lock_path because it is non-local
147
local_lock_path = lock_path or CONF.lock_path
148
if not local_lock_path:
149
raise cfg.RequiredOptError('lock_path')
151
if not os.path.exists(local_lock_path):
152
fileutils.ensure_tree(local_lock_path)
153
LOG.info(_('Created lock path: %s'), local_lock_path)
155
def add_prefix(name, prefix):
158
sep = '' if prefix.endswith('-') else '-'
159
return '%s%s%s' % (prefix, sep, name)
161
164
# NOTE(mikal): the lock name cannot contain directory
163
lock_file_name = add_prefix(name.replace(os.sep, '_'),
166
lock_file_path = os.path.join(local_lock_path, lock_file_name)
169
lock = InterProcessLock(lock_file_path)
171
LOG.debug(_('Got file lock "%(lock)s" at %(path)s'),
172
{'lock': name, 'path': lock_file_path})
175
LOG.debug(_('Released file lock "%(lock)s" at %(path)s'),
176
{'lock': name, 'path': lock_file_path})
179
@contextlib.contextmanager
166
name = name.replace(os.sep, '_')
168
sep = '' if lock_file_prefix.endswith('-') else '-'
169
name = '%s%s%s' % (lock_file_prefix, sep, name)
171
if not CONF.lock_path:
172
raise cfg.RequiredOptError('lock_path')
174
lock_file_path = os.path.join(CONF.lock_path, name)
176
return InterProcessLock(lock_file_path)
180
179
def internal_lock(name):
181
180
with _semaphores_lock:
185
184
sem = threading.Semaphore()
186
185
_semaphores[name] = sem
189
LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name})
191
# NOTE(mikal): I know this looks odd
192
if not hasattr(local.strong_store, 'locks_held'):
193
local.strong_store.locks_held = []
194
local.strong_store.locks_held.append(name)
199
local.strong_store.locks_held.remove(name)
187
LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name})
202
191
@contextlib.contextmanager
203
def lock(name, lock_file_prefix=None, external=False, lock_path=None):
192
def lock(name, lock_file_prefix=None, external=False):
204
193
"""Context based lock
206
195
This function yields a `threading.Semaphore` instance (if we don't use
214
203
should work across multiple processes. This means that if two different
215
204
workers both run a a method decorated with @synchronized('mylock',
216
205
external=True), only one of them will execute at a time.
218
:param lock_path: The lock_path keyword argument is used to specify a
219
special location for external lock files to live. If nothing is set, then
220
CONF.lock_path is used as a default.
222
207
if external and not CONF.disable_process_locking:
223
with external_lock(name, lock_file_prefix, lock_path) as lock:
208
lock = external_lock(name, lock_file_prefix)
226
with internal_lock(name) as lock:
230
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
210
lock = internal_lock(name)
215
def synchronized(name, lock_file_prefix=None, external=False):
231
216
"""Synchronization decorator.
233
218
Decorating a method like so::
255
240
@functools.wraps(f)
256
241
def inner(*args, **kwargs):
258
with lock(name, lock_file_prefix, external, lock_path):
243
with lock(name, lock_file_prefix, external):
259
244
LOG.debug(_('Got semaphore / lock "%(function)s"'),
260
245
{'function': f.__name__})
261
246
return f(*args, **kwargs)