2
# Module providing various facilities to other parts of the package
4
# multiprocessing/util.py
6
# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
12
import threading # we want threading to install it's
13
# cleanup function before multiprocessing does
15
from multiprocessing.process import current_process, active_children
18
'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
19
'log_to_stderr', 'get_temp_dir', 'register_after_fork',
20
'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal'
33
LOGGER_NAME = 'multiprocessing'
34
DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
37
_log_to_stderr = False
39
def sub_debug(msg, *args):
41
_logger.log(SUBDEBUG, msg, *args)
43
def debug(msg, *args):
45
_logger.log(DEBUG, msg, *args)
49
_logger.log(INFO, msg, *args)
51
def sub_warning(msg, *args):
53
_logger.log(SUBWARNING, msg, *args)
57
Returns logger used by multiprocessing
62
import logging, atexit
64
# XXX multiprocessing should cleanup before logging
65
if hasattr(atexit, 'unregister'):
66
atexit.unregister(_exit_function)
67
atexit.register(_exit_function)
69
atexit._exithandlers.remove((_exit_function, (), {}))
70
atexit._exithandlers.append((_exit_function, (), {}))
73
_logger = logging.getLogger(LOGGER_NAME)
77
def _check_logger_class():
79
Make sure process name is recorded when loggers are used
81
# XXX This function is unnecessary once logging is patched
83
if hasattr(logging, 'multiprocessing'):
86
logging._acquireLock()
88
OldLoggerClass = logging.getLoggerClass()
89
if not getattr(OldLoggerClass, '_process_aware', False):
90
class ProcessAwareLogger(OldLoggerClass):
92
def makeRecord(self, *args, **kwds):
93
record = OldLoggerClass.makeRecord(self, *args, **kwds)
94
record.processName = current_process()._name
96
logging.setLoggerClass(ProcessAwareLogger)
98
logging._releaseLock()
100
def log_to_stderr(level=None):
102
Turn on logging and add a handler which prints to stderr
104
global _log_to_stderr
106
logger = get_logger()
107
formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
108
handler = logging.StreamHandler()
109
handler.setFormatter(formatter)
110
logger.addHandler(handler)
111
if level is not None:
112
logger.setLevel(level)
113
_log_to_stderr = True
116
# Function returning a temp directory which will be removed on exit
120
# get name of a temp directory which will be automatically cleaned up
121
if current_process()._tempdir is None:
122
import shutil, tempfile
123
tempdir = tempfile.mkdtemp(prefix='pymp-')
124
info('created temp directory %s', tempdir)
125
Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
126
current_process()._tempdir = tempdir
127
return current_process()._tempdir
130
# Support for reinitialization of objects when bootstrapping a child process
133
_afterfork_registry = weakref.WeakValueDictionary()
134
_afterfork_counter = itertools.count()
136
def _run_after_forkers():
137
items = list(_afterfork_registry.items())
139
for (index, ident, func), obj in items:
143
info('after forker raised exception %s', e)
145
def register_after_fork(obj, func):
146
_afterfork_registry[(_afterfork_counter.next(), id(obj), func)] = obj
149
# Finalization using weakrefs
152
_finalizer_registry = {}
153
_finalizer_counter = itertools.count()
156
class Finalize(object):
158
Class which supports object finalization using weakrefs
160
def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
161
assert exitpriority is None or type(exitpriority) is int
164
self._weakref = weakref.ref(obj, self)
166
assert exitpriority is not None
168
self._callback = callback
170
self._kwargs = kwargs or {}
171
self._key = (exitpriority, _finalizer_counter.next())
173
_finalizer_registry[self._key] = self
175
def __call__(self, wr=None):
177
Run the callback unless it has already been called or cancelled
180
del _finalizer_registry[self._key]
182
sub_debug('finalizer no longer registered')
184
sub_debug('finalizer calling %s with args %s and kwargs %s',
185
self._callback, self._args, self._kwargs)
186
res = self._callback(*self._args, **self._kwargs)
187
self._weakref = self._callback = self._args = \
188
self._kwargs = self._key = None
193
Cancel finalization of the object
196
del _finalizer_registry[self._key]
200
self._weakref = self._callback = self._args = \
201
self._kwargs = self._key = None
203
def still_active(self):
205
Return whether this finalizer is still waiting to invoke callback
207
return self._key in _finalizer_registry
211
obj = self._weakref()
212
except (AttributeError, TypeError):
216
return '<Finalize object, dead>'
218
x = '<Finalize object, callback=%s' % \
219
getattr(self._callback, '__name__', self._callback)
221
x += ', args=' + str(self._args)
223
x += ', kwargs=' + str(self._kwargs)
224
if self._key[0] is not None:
225
x += ', exitprority=' + str(self._key[0])
229
def _run_finalizers(minpriority=None):
231
Run all finalizers whose exit priority is not None and at least minpriority
233
Finalizers with highest priority are called first; finalizers with
234
the same priority will be called in reverse order of creation.
236
if minpriority is None:
237
f = lambda p : p[0][0] is not None
239
f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
241
items = [x for x in _finalizer_registry.items() if f(x)]
242
items.sort(reverse=True)
244
for key, finalizer in items:
245
sub_debug('calling %s', finalizer)
250
traceback.print_exc()
252
if minpriority is None:
253
_finalizer_registry.clear()
261
Returns true if the process is shutting down
263
return _exiting or _exiting is None
267
def _exit_function():
270
info('process shutting down')
271
debug('running all "atexit" finalizers with priority >= 0')
274
for p in active_children():
276
info('calling terminate() for daemon %s', p.name)
279
for p in active_children():
280
info('calling join() for process %s', p.name)
283
debug('running the remaining "atexit" finalizers')
286
atexit.register(_exit_function)
289
# Some fork aware types
292
class ForkAwareThreadLock(object):
294
self._lock = threading.Lock()
295
self.acquire = self._lock.acquire
296
self.release = self._lock.release
297
register_after_fork(self, ForkAwareThreadLock.__init__)
299
class ForkAwareLocal(threading.local):
301
register_after_fork(self, lambda obj : obj.__dict__.clear())
302
def __reduce__(self):
303
return type(self), ()